Skip to main content

Crate varpulis_runtime

Crate varpulis_runtime 

Source
Expand description

§Varpulis Runtime

High-performance execution engine for VPL programs.

This crate is the heart of Varpulis, providing:

  • Stream Processing: Real-time event processing with filtering and transformation
  • SASE+ Pattern Matching: Complex event detection with sequences, Kleene closures, and negation
  • Windowed Aggregations: Time and count-based windows with SIMD-optimized aggregations
  • Connectors: MQTT, HTTP, and file-based event sources/sinks

§Features

FeatureDescription
mqttMQTT connector support (rumqttc)
kafkaKafka connector support (rdkafka)
persistenceRocksDB state persistence
databaseSQL database connectors (PostgreSQL, MySQL, SQLite)
redisRedis connector support
all-connectorsEnable all connector features

§Modules

§Core Processing

  • engine: Main execution engine, compiles and runs VPL programs
  • event: Event structure and field access
  • stream: Stream abstraction for event flows

§Pattern Matching

  • sase: SASE+ pattern matching (SEQ, AND, OR, NOT, Kleene+/*)
  • sequence: Sequence pattern tracking

§Windowing & Aggregation

  • window: Tumbling, sliding, and count-based windows
  • aggregation: Aggregation functions (sum, avg, min, max, stddev, percentile)
  • simd: SIMD-optimized operations using AVX2

§Advanced Features

//! - join: Multi-stream join operations

§Multi-Query Trend Aggregation

  • greta: GRETA baseline aggregation (VLDB 2017)
  • hamlet: Hamlet shared aggregation with graphlets (SIGMOD 2021) - recommended
  • zdd_unified: ZDD-based aggregation (experimental, for research)

§I/O & Connectors

  • connector: Source and sink connectors (MQTT, HTTP, Kafka)
  • sink: Output sinks (console, file, HTTP webhook)
  • event_file: Event file parsing and streaming

§Infrastructure

§Quick Start

use varpulis_runtime::{Engine, Event};
use varpulis_parser::parse;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Parse a VPL program
    let program = parse(r#"
        stream HighTemp = SensorReading
            .where(temperature > 100)
            .emit(sensor: sensor_id, temp: temperature)
    "#).unwrap();

    // Create engine with output channel
    let (output_tx, mut output_rx) = mpsc::channel(100);
    let mut engine = Engine::new(output_tx);
    engine.load(&program).unwrap();

    // Process an event
    let event = Event::new("SensorReading")
        .with_field("temperature", 105.5)
        .with_field("sensor_id", "S1");
    engine.process(event).await.unwrap();

    // Receive output event
    if let Some(output) = output_rx.recv().await {
        println!("Output: {} {:?}", output.event_type, output.data);
    }
}

§Performance

  • SIMD-optimized aggregations (4x speedup with AVX2)
  • Incremental aggregation for sliding windows
  • Zero-copy event sharing via Arc<Event>
  • Parallel worker pools with backpressure

§See Also

Re-exports§

pub use columnar::Column;
pub use columnar::ColumnarAccess;
pub use columnar::ColumnarBuffer;
pub use columnar::ColumnarCheckpoint;
pub use context::CheckpointAck;
pub use context::CheckpointBarrier;
pub use context::CheckpointCoordinator;
pub use context::ContextConfig;
pub use context::ContextMap;
pub use context::ContextMessage;
pub use context::ContextOrchestrator;
pub use context::ContextRuntime;
pub use context::DispatchError;
pub use context::EventTypeRouter;
pub use engine::error::EngineError;
pub use engine::EngineBuilder;
pub use engine::Engine;
pub use engine::ReloadReport;
pub use engine::SourceBinding;
pub use event_file::StreamingEventReader;
pub use metrics::Metrics;
pub use persistence::Checkpoint;
pub use persistence::CheckpointConfig;
pub use persistence::CheckpointManager;
pub use persistence::FileStore;
pub use persistence::MemoryStore;
pub use persistence::StateStore;
pub use persistence::StoreError;
pub use sink::ConsoleSink;
pub use sink::FileSink;
pub use sink::HttpSink;
pub use sink::MultiSink;
pub use stream::Stream;
pub use tenant::hash_api_key;
pub use tenant::shared_tenant_manager;
pub use tenant::shared_tenant_manager_with_store;
pub use tenant::Pipeline;
pub use tenant::PipelineSnapshot;
pub use tenant::PipelineStatus;
pub use tenant::SharedTenantManager;
pub use tenant::Tenant;
pub use tenant::TenantError;
pub use tenant::TenantId;
pub use tenant::TenantManager;
pub use tenant::TenantQuota;
pub use tenant::TenantSnapshot;
pub use tenant::TenantUsage;
pub use timer::spawn_timer;
pub use timer::TimerManager;
pub use window::CountWindow;
pub use window::DelayBuffer;
pub use window::IncrementalAggregates;
pub use window::IncrementalSlidingWindow;
pub use window::PartitionedDelayBuffer;
pub use window::PartitionedPreviousValueTracker;
pub use window::PartitionedSessionWindow;
pub use window::PartitionedSlidingWindow;
pub use window::PartitionedTumblingWindow;
pub use window::PreviousValueTracker;
pub use window::SessionWindow;
pub use window::SlidingCountWindow;
pub use window::SlidingWindow;
pub use window::TumblingWindow;
pub use worker_pool::BackpressureStrategy;
pub use worker_pool::PoolBackpressureError;
pub use worker_pool::WorkerPool;
pub use worker_pool::WorkerPoolConfig;
pub use worker_pool::WorkerPoolMetrics;
pub use worker_pool::WorkerState;
pub use worker_pool::WorkerStatus;
pub use varpulis_dead_letter as dead_letter;
pub use varpulis_hamlet as hamlet;
pub use varpulis_pst as pst;
pub use varpulis_sase as sase;
pub use varpulis_simd as simd;
pub use varpulis_enrichment as enrichment;
pub use varpulis_connectors as connector;

Modules§

aggregation
Aggregation functions for stream processing.
backpressure
Backpressure strategies for per-stage buffer management
circuit_breaker
Circuit breaker for sink connectors.
codec
Checkpoint serialization codec.
columnar
Columnar Event Storage for SIMD-Optimized Aggregations
context
Context-based multi-threaded execution architecture.
converter
Converter trait for standardized serialization/deserialization of events.
engine
Main execution engine for Varpulis
event
Event types for the runtime
event_file
Event file parser for VPL
greta
GRETA - Graph-based Real-time Event Trend Aggregation
health
Health monitoring for Varpulis components
interactive
Interactive session core for Varpulis.
join
Join buffer for correlating events from multiple streams
limits
Resource limits for event parsing to prevent denial-of-service attacks.
metrics
Prometheus metrics for Varpulis
persistence
State Persistence for Varpulis Engine
sase_persistence
Checkpointing and restore for SASE engine state.
scoring
ONNX model scoring for .score() operator
sequence
Sequence tracking for temporal event correlations
simulator
HVAC Building Simulator for demo purposes
sink
Sink implementations for outputting processed events
stream
Stream abstraction for the runtime
tenant
Multi-tenant isolation for SaaS deployment
testing
Time-accelerated testing infrastructure
timer
Timer module for periodic event generation
udf
User-defined functions with type signatures
vpl_test
VPL Test DSL — declarative test format for Varpulis programs.
watermark
Per-source watermark tracking for multi-source event-time processing.
window
Window implementations for stream processing
worker_pool
Worker Pool for Controlled Parallelism
zdd_unified
ZDD Unified - ZDD as Unified Representation for Multi-Query Aggregation

Structs§

Event
A runtime event

Enums§

SinkError
Errors produced by sink operations.

Traits§

Sink
Trait for event sinks

Type Aliases§

SharedEvent
A shared reference to an Event for efficient passing through pipelines. Using Arc avoids expensive deep clones when events are processed by multiple streams, windows, or pattern matchers.