varpulis-runtime 0.6.4

Runtime execution engine for Varpulis
Documentation

Varpulis Runtime

High-performance execution engine for VPL programs.

This crate is the heart of Varpulis, providing:

  • Stream Processing: Real-time event processing with filtering and transformation
  • SASE+ Pattern Matching: Complex event detection with sequences, Kleene closures, and negation
  • Windowed Aggregations: Time and count-based windows with SIMD-optimized aggregations
  • Connectors: MQTT, HTTP, and file-based event sources/sinks

Features

Feature Description
mqtt MQTT connector support (rumqttc)
kafka Kafka connector support (rdkafka)
persistence RocksDB state persistence
database SQL database connectors (PostgreSQL, MySQL, SQLite)
redis Redis connector support
all-connectors Enable all connector features

Modules

Core Processing

  • [engine]: Main execution engine, compiles and runs VPL programs
  • [event]: Event structure and field access
  • [stream]: Stream abstraction for event flows

Pattern Matching

  • [sase]: SASE+ pattern matching (SEQ, AND, OR, NOT, Kleene+/*)
  • [sequence]: Sequence pattern tracking

Windowing & Aggregation

  • [window]: Tumbling, sliding, and count-based windows
  • [aggregation]: Aggregation functions (sum, avg, min, max, stddev, percentile)
  • [simd]: SIMD-optimized operations using AVX2

Advanced Features

//! - [join]: Multi-stream join operations

Multi-Query Trend Aggregation

  • [greta]: GRETA baseline aggregation (VLDB 2017)
  • [hamlet]: Hamlet shared aggregation with graphlets (SIGMOD 2021) - recommended
  • [zdd_unified]: ZDD-based aggregation (experimental, for research)

I/O & Connectors

  • [connector]: Source and sink connectors (MQTT, HTTP, Kafka)
  • [sink]: Output sinks (console, file, HTTP webhook)
  • [event_file]: Event file parsing and streaming

Infrastructure

  • [worker_pool]: Parallel processing with backpressure
  • [persistence]: State checkpointing (RocksDB, memory)
  • [metrics]: Prometheus metrics
  • [timer]: Timer management for timeouts
  • [simulator]: Event simulation for demos

Quick Start

use varpulis_runtime::{Engine, Event};
use varpulis_parser::parse;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Parse a VPL program
    let program = parse(r#"
        stream HighTemp = SensorReading
            .where(temperature > 100)
            .emit(sensor: sensor_id, temp: temperature)
    "#).unwrap();

    // Create engine with output channel
    let (output_tx, mut output_rx) = mpsc::channel(100);
    let mut engine = Engine::new(output_tx);
    engine.load(&program).unwrap();

    // Process an event
    let event = Event::new("SensorReading")
        .with_field("temperature", 105.5)
        .with_field("sensor_id", "S1");
    engine.process(event).await.unwrap();

    // Receive output event
    if let Some(output) = output_rx.recv().await {
        println!("Output: {} {:?}", output.event_type, output.data);
    }
}

Performance

  • SIMD-optimized aggregations (4x speedup with AVX2)
  • Incremental aggregation for sliding windows
  • Zero-copy event sharing via Arc<Event>
  • Parallel worker pools with backpressure

See Also