Expand description
§Varpulis Runtime
High-performance execution engine for VPL programs.
This crate is the heart of Varpulis, providing:
- Stream Processing: Real-time event processing with filtering and transformation
- SASE+ Pattern Matching: Complex event detection with sequences, Kleene closures, and negation
- Windowed Aggregations: Time and count-based windows with SIMD-optimized aggregations
- Connectors: MQTT, HTTP, and file-based event sources/sinks
§Features
| Feature | Description |
|---|---|
mqtt | MQTT connector support (rumqttc) |
kafka | Kafka connector support (rdkafka) |
persistence | RocksDB state persistence |
database | SQL database connectors (PostgreSQL, MySQL, SQLite) |
redis | Redis connector support |
all-connectors | Enable all connector features |
§Modules
§Core Processing
engine: Main execution engine, compiles and runs VPL programsevent: Event structure and field accessstream: Stream abstraction for event flows
§Pattern Matching
§Windowing & Aggregation
window: Tumbling, sliding, and count-based windowsaggregation: Aggregation functions (sum, avg, min, max, stddev, percentile)simd: SIMD-optimized operations using AVX2
§Advanced Features
//! - join: Multi-stream join operations
§Multi-Query Trend Aggregation
greta: GRETA baseline aggregation (VLDB 2017)hamlet: Hamlet shared aggregation with graphlets (SIGMOD 2021) - recommendedzdd_unified: ZDD-based aggregation (experimental, for research)
§I/O & Connectors
connector: Source and sink connectors (MQTT, HTTP, Kafka)sink: Output sinks (console, file, HTTP webhook)event_file: Event file parsing and streaming
§Infrastructure
worker_pool: Parallel processing with backpressurepersistence: State checkpointing (RocksDB, memory)metrics: Prometheus metricstimer: Timer management for timeoutssimulator: Event simulation for demos
§Quick Start
use varpulis_runtime::{Engine, Event};
use varpulis_parser::parse;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Parse a VPL program
let program = parse(r#"
stream HighTemp = SensorReading
.where(temperature > 100)
.emit(sensor: sensor_id, temp: temperature)
"#).unwrap();
// Create engine with output channel
let (output_tx, mut output_rx) = mpsc::channel(100);
let mut engine = Engine::new(output_tx);
engine.load(&program).unwrap();
// Process an event
let event = Event::new("SensorReading")
.with_field("temperature", 105.5)
.with_field("sensor_id", "S1");
engine.process(event).await.unwrap();
// Receive output event
if let Some(output) = output_rx.recv().await {
println!("Output: {} {:?}", output.event_type, output.data);
}
}§Performance
- SIMD-optimized aggregations (4x speedup with AVX2)
- Incremental aggregation for sliding windows
- Zero-copy event sharing via
Arc<Event> - Parallel worker pools with backpressure
§See Also
varpulis_core: Core types and ASTvarpulis_parser: Parsing VPLvarpulis_cli: Command-line interface
Re-exports§
pub use columnar::Column;pub use columnar::ColumnarAccess;pub use columnar::ColumnarBuffer;pub use columnar::ColumnarCheckpoint;pub use context::CheckpointAck;pub use context::CheckpointBarrier;pub use context::CheckpointCoordinator;pub use context::ContextConfig;pub use context::ContextMap;pub use context::ContextMessage;pub use context::ContextOrchestrator;pub use context::ContextRuntime;pub use context::DispatchError;pub use context::EventTypeRouter;pub use engine::error::EngineError;pub use engine::EngineBuilder;pub use engine::Engine;pub use engine::ReloadReport;pub use engine::SourceBinding;pub use event_file::StreamingEventReader;pub use metrics::Metrics;pub use persistence::Checkpoint;pub use persistence::CheckpointConfig;pub use persistence::CheckpointManager;pub use persistence::FileStore;pub use persistence::MemoryStore;pub use persistence::StateStore;pub use persistence::StoreError;pub use sink::ConsoleSink;pub use sink::FileSink;pub use sink::HttpSink;pub use sink::MultiSink;pub use stream::Stream;pub use tenant::hash_api_key;pub use tenant::Pipeline;pub use tenant::PipelineSnapshot;pub use tenant::PipelineStatus;pub use tenant::Tenant;pub use tenant::TenantError;pub use tenant::TenantId;pub use tenant::TenantManager;pub use tenant::TenantQuota;pub use tenant::TenantSnapshot;pub use tenant::TenantUsage;pub use timer::spawn_timer;pub use timer::TimerManager;pub use window::CountWindow;pub use window::DelayBuffer;pub use window::IncrementalAggregates;pub use window::IncrementalSlidingWindow;pub use window::PartitionedDelayBuffer;pub use window::PartitionedPreviousValueTracker;pub use window::PartitionedSessionWindow;pub use window::PartitionedSlidingWindow;pub use window::PartitionedTumblingWindow;pub use window::PreviousValueTracker;pub use window::SessionWindow;pub use window::SlidingCountWindow;pub use window::SlidingWindow;pub use window::TumblingWindow;pub use worker_pool::BackpressureStrategy;pub use worker_pool::PoolBackpressureError;pub use worker_pool::WorkerPool;pub use worker_pool::WorkerPoolConfig;pub use worker_pool::WorkerPoolMetrics;pub use worker_pool::WorkerState;pub use worker_pool::WorkerStatus;pub use varpulis_dead_letter as dead_letter;pub use varpulis_hamlet as hamlet;pub use varpulis_pst as pst;pub use varpulis_sase as sase;pub use varpulis_simd as simd;pub use varpulis_enrichment as enrichment;pub use varpulis_connectors as connector;
Modules§
- aggregation
- Aggregation functions for stream processing.
- backpressure
- Backpressure strategies for per-stage buffer management
- circuit_
breaker - Circuit breaker for sink connectors.
- codec
- Checkpoint serialization codec.
- columnar
- Columnar Event Storage for SIMD-Optimized Aggregations
- context
- Context-based multi-threaded execution architecture.
- converter
- Converter trait for standardized serialization/deserialization of events.
- engine
- Main execution engine for Varpulis
- event
- Event types for the runtime
- event_
file - Event file parser for VPL
- greta
- GRETA - Graph-based Real-time Event Trend Aggregation
- health
- Health monitoring for Varpulis components
- interactive
- Interactive session core for Varpulis.
- join
- Join buffer for correlating events from multiple streams
- limits
- Resource limits for event parsing to prevent denial-of-service attacks.
- metrics
- Prometheus metrics for Varpulis
- persistence
- State Persistence for Varpulis Engine
- sase_
persistence - Checkpointing and restore for SASE engine state.
- scoring
- ONNX model scoring for
.score()operator - sequence
- Sequence tracking for temporal event correlations
- simulator
- HVAC Building Simulator for demo purposes
- sink
- Sink implementations for outputting processed events
- stream
- Stream abstraction for the runtime
- tenant
- Multi-tenant isolation for SaaS deployment
- testing
- Time-accelerated testing infrastructure
- timer
- Timer module for periodic event generation
- udf
- User-defined functions with type signatures
- vpl_
test - VPL Test DSL — declarative test format for Varpulis programs.
- watermark
- Per-source watermark tracking for multi-source event-time processing.
- window
- Window implementations for stream processing
- worker_
pool - Worker Pool for Controlled Parallelism
- zdd_
unified - ZDD Unified - ZDD as Unified Representation for Multi-Query Aggregation
Structs§
- Event
- A runtime event
Enums§
- Sink
Error - Errors produced by sink operations.
Traits§
- Sink
- Trait for event sinks
Type Aliases§
- Shared
Event - A shared reference to an Event for efficient passing through pipelines. Using Arc avoids expensive deep clones when events are processed by multiple streams, windows, or pattern matchers.