Skip to main content

varpulis_runtime/engine/
types.rs

1//! Type definitions for the Varpulis engine
2//!
3//! This module contains all the structs, enums and type definitions used by the engine.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::FxHashMap;
10use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
11use varpulis_core::Value;
12
13use crate::aggregation::Aggregator;
14use crate::event::SharedEvent;
15use crate::join::JoinBuffer;
16use crate::sase::SaseEngine;
17use crate::window::{
18    CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
19    SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
20};
21
22// =============================================================================
23// Public Types (exported from crate)
24// =============================================================================
25
26/// Configuration block parsed from VPL
27#[derive(Debug, Clone)]
28pub struct EngineConfig {
29    pub name: String,
30    pub values: HashMap<String, ConfigValue>,
31}
32
33/// User-defined function
34#[derive(Debug, Clone)]
35pub struct UserFunction {
36    pub name: String,
37    pub params: Vec<(String, varpulis_core::Type)>, // (name, type)
38    pub return_type: Option<varpulis_core::Type>,
39    pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
40}
41
42/// Engine performance metrics
43#[derive(Debug, Clone)]
44pub struct EngineMetrics {
45    pub events_processed: u64,
46    pub output_events_emitted: u64,
47    pub streams_count: usize,
48}
49
50/// Report from a hot reload operation
51#[derive(Debug, Clone, Default)]
52pub struct ReloadReport {
53    /// Streams that were added
54    pub streams_added: Vec<String>,
55    /// Streams that were removed
56    pub streams_removed: Vec<String>,
57    /// Streams that were updated (definition changed)
58    pub streams_updated: Vec<String>,
59    /// Streams whose state was preserved (windows, aggregators)
60    pub state_preserved: Vec<String>,
61    /// Streams whose state had to be reset
62    pub state_reset: Vec<String>,
63    /// Non-fatal warnings during reload
64    pub warnings: Vec<String>,
65}
66
67impl ReloadReport {
68    pub const fn is_empty(&self) -> bool {
69        self.streams_added.is_empty()
70            && self.streams_removed.is_empty()
71            && self.streams_updated.is_empty()
72    }
73}
74
75/// Source connector binding from .from() declarations
76#[derive(Debug, Clone)]
77pub struct SourceBinding {
78    pub connector_name: String,
79    pub event_type: String,
80    pub topic_override: Option<String>,
81    /// Extra parameters from .from() (e.g., client_id, qos) — excludes topic
82    pub extra_params: HashMap<String, String>,
83}
84
85/// Named SASE+ pattern definition
86#[derive(Debug, Clone)]
87pub struct NamedPattern {
88    /// Pattern name
89    pub name: String,
90    /// SASE+ pattern expression (SEQ, AND, OR, NOT)
91    pub expr: SasePatternExpr,
92    /// Optional time constraint
93    pub within: Option<Expr>,
94    /// Optional partition key expression
95    pub partition_by: Option<Expr>,
96}
97
98// =============================================================================
99// Internal Types (crate-visible)
100// =============================================================================
101
102/// Runtime stream definition
103pub struct StreamDefinition {
104    pub name: String,
105    /// Cached `Arc<str>` of the stream name — avoids repeated `String→Arc<str>` conversions
106    /// in the hot path (output event renaming, emit/sequence event construction).
107    pub name_arc: Arc<str>,
108    pub source: RuntimeSource,
109    pub operations: Vec<RuntimeOp>,
110    /// SASE+ pattern matching engine (NFA-based, primary engine for sequences)
111    pub sase_engine: Option<SaseEngine>,
112    /// Join buffer for correlating events from multiple sources
113    pub join_buffer: Option<JoinBuffer>,
114    /// Mapping from event type to join source name (for join streams)
115    pub event_type_to_source: FxHashMap<String, String>,
116    /// Hamlet aggregator for trend aggregation mode
117    pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
118    /// Shared Hamlet aggregator reference (when multi-query sharing is active)
119    pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
120    /// PST-based pattern forecaster (when .forecast() is used)
121    pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
122    /// Last raw event that entered the pipeline (used by Forecast op to learn
123    /// from every event even when the Sequence op clears current_events).
124    pub last_raw_event: Option<crate::event::SharedEvent>,
125    /// Enrichment provider + cache (when .enrich() is used)
126    pub enrichment: Option<(
127        Arc<dyn crate::enrichment::EnrichmentProvider>,
128        Arc<crate::enrichment::EnrichmentCache>,
129    )>,
130    /// Optional backpressure buffer configuration for this stream's input channel
131    #[allow(dead_code)]
132    pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
133}
134
135/// Source of events for a stream
136pub enum RuntimeSource {
137    EventType(String),
138    Stream(String),
139    Join(Vec<String>),
140    /// Merge multiple event types with optional filters
141    Merge(Vec<MergeSource>),
142    /// Periodic timer source
143    Timer(TimerConfig),
144}
145
146impl RuntimeSource {
147    /// Get a description of this source for logging
148    pub fn describe(&self) -> String {
149        match self {
150            Self::EventType(t) => format!("event:{t}"),
151            Self::Stream(s) => format!("stream:{s}"),
152            Self::Join(sources) => format!("join:{}", sources.join(",")),
153            Self::Merge(sources) => format!("merge:{} sources", sources.len()),
154            Self::Timer(config) => {
155                format!("timer:{}ms", config.interval_ns / 1_000_000)
156            }
157        }
158    }
159}
160
161/// Configuration for a periodic timer source
162pub struct TimerConfig {
163    /// Interval between timer fires in nanoseconds
164    pub interval_ns: u64,
165    /// Optional initial delay before first fire in nanoseconds
166    pub initial_delay_ns: Option<u64>,
167    /// Event type name for timer events (e.g., "Timer_MyStream")
168    pub timer_event_type: String,
169}
170
171/// A source in a merge construct with optional filter
172pub struct MergeSource {
173    pub name: String,
174    pub event_type: String,
175    pub filter: Option<varpulis_core::ast::Expr>,
176}
177
178/// Runtime operations that can be applied to a stream
179pub enum RuntimeOp {
180    /// Filter with closure (for sequence filters with context)
181    WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
182    /// Filter with expression (evaluated at runtime with user functions)
183    WhereExpr(varpulis_core::ast::Expr),
184    /// Window with optional partition support
185    Window(WindowType),
186    /// Partitioned window - maintains separate windows per partition key (tumbling)
187    PartitionedWindow(PartitionedWindowState),
188    /// Partitioned sliding count window - maintains separate sliding windows per partition key
189    PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
190    Aggregate(Aggregator),
191    /// Partitioned aggregate - maintains separate aggregators per partition key
192    PartitionedAggregate(PartitionedAggregatorState),
193    /// Having filter - filter aggregation results (post-aggregate filtering)
194    Having(varpulis_core::ast::Expr),
195    /// Select/projection with computed fields
196    Select(SelectConfig),
197    Emit(EmitConfig),
198    /// Emit with expression evaluation for computed fields
199    EmitExpr(EmitExprConfig),
200    /// Print to stdout
201    Print(PrintConfig),
202    /// Log with level
203    Log(LogConfig),
204    /// Sequence operation - index into sequence_tracker steps
205    Sequence,
206    /// Pattern matching with lambda expression
207    Pattern(PatternConfig),
208    /// Process with expression: `.process(expr)` - evaluates for side effects (emit)
209    Process(varpulis_core::ast::Expr),
210    /// Send to connector: `.to(ConnectorName)`
211    To(ToConfig),
212    /// Trend aggregation via Hamlet engine (replaces Sequence for this stream)
213    TrendAggregate(TrendAggregateConfig),
214    /// ONNX model scoring: extract input fields, run inference, add output fields
215    #[allow(dead_code)]
216    Score(ScoreConfig),
217    /// PST-based pattern forecasting: predict pattern completion probability and time
218    Forecast(ForecastConfig),
219    /// External connector enrichment: lookup reference data and inject fields
220    Enrich(EnrichConfig),
221    /// Deduplicate events by expression value (or entire event if None)
222    Distinct(DistinctState),
223    /// Pass at most N events, then stop the stream
224    Limit(LimitState),
225    /// Parallel processing: partition events across a Rayon thread pool
226    Concurrent(ConcurrentConfig),
227}
228
229impl RuntimeOp {
230    /// Return a short human-readable name for this operation.
231    ///
232    /// Used by the topology builder to create operation summaries.
233    pub const fn summary_name(&self) -> &'static str {
234        match self {
235            Self::WhereClosure(_) => "Filter",
236            Self::WhereExpr(_) => "Filter",
237            Self::Window(_) => "Window",
238            Self::PartitionedWindow(_) => "PartitionedWindow",
239            Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
240            Self::Aggregate(_) => "Aggregate",
241            Self::PartitionedAggregate(_) => "PartitionedAggregate",
242            Self::Having(_) => "Having",
243            Self::Select(_) => "Select",
244            Self::Emit(_) => "Emit",
245            Self::EmitExpr(_) => "EmitExpr",
246            Self::Print(_) => "Print",
247            Self::Log(_) => "Log",
248            Self::Sequence => "Sequence",
249            Self::Pattern(_) => "Pattern",
250            Self::Process(_) => "Process",
251            Self::To(_) => "Sink",
252            Self::TrendAggregate(_) => "TrendAggregate",
253            Self::Score(_) => "Score",
254            Self::Forecast(_) => "Forecast",
255            Self::Enrich(_) => "Enrich",
256            Self::Distinct(_) => "Distinct",
257            Self::Limit(_) => "Limit",
258            Self::Concurrent(_) => "Concurrent",
259        }
260    }
261}
262
263/// Configuration for `.concurrent()` parallel processing
264pub struct ConcurrentConfig {
265    pub workers: usize,
266    pub partition_key: Option<String>,
267    pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
268}
269
270/// Configuration for trend aggregation via Hamlet engine
271pub struct TrendAggregateConfig {
272    /// Fields to compute: (output_alias, aggregate_function)
273    pub fields: Vec<(String, crate::greta::GretaAggregate)>,
274    /// Query ID in the Hamlet aggregator
275    pub query_id: crate::greta::QueryId,
276    /// Field-based aggregate info for runtime computation (sum/avg/min/max)
277    pub field_aggregates: Vec<FieldAggregateInfo>,
278    /// Maps type index (u16) to event type name (for count_events resolution)
279    pub type_index_to_name: Vec<String>,
280    /// Accumulated events across invocations (persists between execute_op calls)
281    pub accumulated: Vec<SharedEvent>,
282}
283
284/// Info for computing field-based aggregates at runtime.
285/// Populated at compile time from `sum_trends(alias.field)` etc.
286pub struct FieldAggregateInfo {
287    /// Output alias in the emitted event (e.g., "total")
288    pub output_alias: String,
289    /// Aggregate function name: "sum", "avg", "min", "max"
290    pub func: String,
291    /// Event type to filter on (e.g., "sensor_reading")
292    pub event_type: String,
293    /// Field name to aggregate (e.g., "temperature")
294    pub field_name: String,
295}
296
297/// Configuration for PST-based pattern forecasting
298#[allow(dead_code)]
299pub struct ForecastConfig {
300    /// Minimum probability to emit forecast events.
301    pub confidence_threshold: f64,
302    /// Forecast horizon in nanoseconds.
303    pub horizon_ns: u64,
304    /// Events before forecasting starts.
305    pub warmup_events: u64,
306    /// Maximum PST context depth.
307    pub max_depth: usize,
308    /// Whether Hawkes intensity modulation is enabled.
309    pub hawkes: bool,
310    /// Whether conformal prediction intervals are enabled.
311    pub conformal: bool,
312}
313
314/// Configuration for external connector enrichment
315pub struct EnrichConfig {
316    /// Name of the connector to use for lookups
317    pub connector_name: String,
318    /// Expression to evaluate as the lookup key
319    pub key_expr: varpulis_core::ast::Expr,
320    /// Fields to extract from the enrichment response
321    pub fields: Vec<String>,
322    /// Cache TTL in nanoseconds (None = no caching)
323    pub cache_ttl_ns: Option<u64>,
324    /// Timeout in nanoseconds (default 5s)
325    pub timeout_ns: u64,
326    /// Fallback value when lookup fails (None = skip event)
327    pub fallback: Option<varpulis_core::Value>,
328}
329
330/// Configuration for ONNX model scoring
331#[allow(dead_code)]
332pub struct ScoreConfig {
333    #[cfg(feature = "onnx")]
334    pub model: std::sync::Arc<crate::scoring::OnnxModel>,
335    pub input_fields: Vec<String>,
336    pub output_fields: Vec<String>,
337    pub batch_size: usize,
338}
339
340/// Topic specification for `.to()` operations — static or dynamic
341pub enum TopicSpec {
342    /// Static topic string resolved at compile time (current behavior)
343    Static(String),
344    /// Dynamic topic resolved per event from an expression
345    Dynamic(varpulis_core::ast::Expr),
346}
347
348/// Configuration for .to() connector routing
349pub struct ToConfig {
350    pub connector_name: String,
351    /// Topic specification: static string, dynamic expression, or None (use connector default)
352    pub topic: Option<TopicSpec>,
353    /// Cache key for sink lookup (connector_name or connector_name::topic)
354    pub sink_key: String,
355    /// Extra parameters from .to() (e.g., client_id, qos) — excludes topic
356    #[allow(dead_code)]
357    pub extra_params: HashMap<String, String>,
358}
359
360/// Default capacity for distinct state LRU cache
361pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
362
363/// State for .distinct() — tracks seen values to deduplicate events
364pub struct DistinctState {
365    /// Optional expression to evaluate for distinct key; None = entire event
366    pub expr: Option<varpulis_core::ast::Expr>,
367    /// LRU cache of seen value representations (bounded to prevent unbounded growth)
368    pub seen: hashlink::LruCache<String, ()>,
369}
370
371/// State for .limit(n) — passes at most `max` events
372pub struct LimitState {
373    pub max: usize,
374    pub count: usize,
375}
376
377/// Configuration for select/projection operation
378pub struct SelectConfig {
379    /// Fields to include: (output_name, expression)
380    pub fields: Vec<(String, varpulis_core::ast::Expr)>,
381}
382
383/// Result of processing a stream
384pub struct StreamProcessResult {
385    /// Events produced by .emit() — sent to output channel AND downstream
386    pub emitted_events: Vec<SharedEvent>,
387    /// Output events to feed to dependent streams (with stream name as event_type)
388    pub output_events: Vec<SharedEvent>,
389    /// Number of events sent to connector sinks via .to() operations
390    pub sink_events_sent: u64,
391}
392
393/// State for partitioned windows - maintains separate windows per partition key
394pub struct PartitionedWindowState {
395    pub partition_key: String,
396    pub window_size: usize, // For count-based windows
397    pub windows: FxHashMap<String, CountWindow>,
398}
399
400impl PartitionedWindowState {
401    pub fn new(partition_key: String, window_size: usize) -> Self {
402        Self {
403            partition_key,
404            window_size,
405            windows: FxHashMap::default(),
406        }
407    }
408
409    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
410        let key = event.get(&self.partition_key).map_or_else(
411            || "default".to_string(),
412            |v| v.to_partition_key().into_owned(),
413        );
414
415        let window = self
416            .windows
417            .entry(key)
418            .or_insert_with(|| CountWindow::new(self.window_size));
419
420        window.add_shared(event)
421    }
422}
423
424/// State for partitioned sliding count windows - maintains separate sliding windows per partition key
425pub struct PartitionedSlidingCountWindowState {
426    pub partition_key: String,
427    pub window_size: usize,
428    pub slide_size: usize,
429    pub windows: FxHashMap<String, SlidingCountWindow>,
430}
431
432impl PartitionedSlidingCountWindowState {
433    pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
434        Self {
435            partition_key,
436            window_size,
437            slide_size,
438            windows: FxHashMap::default(),
439        }
440    }
441
442    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
443        let key = event.get(&self.partition_key).map_or_else(
444            || "default".to_string(),
445            |v| v.to_partition_key().into_owned(),
446        );
447
448        let window = self
449            .windows
450            .entry(key)
451            .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
452
453        window.add_shared(event)
454    }
455}
456
457/// State for partitioned aggregators
458pub struct PartitionedAggregatorState {
459    pub partition_key: String,
460    pub aggregator_template: Aggregator,
461}
462
463impl PartitionedAggregatorState {
464    pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
465        Self {
466            partition_key,
467            aggregator_template: aggregator,
468        }
469    }
470
471    pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
472        // Group events by partition key - use Arc::clone to avoid deep clones
473        let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
474
475        for event in events {
476            let key = event.get(&self.partition_key).map_or_else(
477                || "default".to_string(),
478                |v| v.to_partition_key().into_owned(),
479            );
480            partitions.entry(key).or_default().push(Arc::clone(event));
481        }
482
483        // Apply aggregator to each partition using apply_shared
484        let mut results = Vec::new();
485        for (key, partition_events) in partitions {
486            let result = self.aggregator_template.apply_shared(&partition_events);
487            results.push((key, result));
488        }
489
490        results
491    }
492}
493
494/// Configuration for pattern matching
495#[allow(dead_code)]
496pub struct PatternConfig {
497    pub name: String,
498    pub matcher: varpulis_core::ast::Expr,
499}
500
501/// Configuration for print operation
502pub struct PrintConfig {
503    pub exprs: Vec<varpulis_core::ast::Expr>,
504}
505
506/// Configuration for log operation
507pub struct LogConfig {
508    pub level: String,
509    pub message: Option<String>,
510    pub data_field: Option<String>,
511}
512
513/// Types of windows supported
514pub enum WindowType {
515    Tumbling(TumblingWindow),
516    Sliding(SlidingWindow),
517    Count(CountWindow),
518    SlidingCount(SlidingCountWindow),
519    PartitionedTumbling(PartitionedTumblingWindow),
520    PartitionedSliding(PartitionedSlidingWindow),
521    Session(SessionWindow),
522    PartitionedSession(PartitionedSessionWindow),
523}
524
525/// Configuration for simple emit operation
526#[allow(dead_code)]
527pub struct EmitConfig {
528    pub fields: Vec<(String, String)>, // (output_name, source_field or literal)
529    pub target_context: Option<String>,
530}
531
532/// Configuration for emit with expressions
533#[allow(dead_code)]
534pub struct EmitExprConfig {
535    pub fields: Vec<(String, varpulis_core::ast::Expr)>, // (output_name, expression)
536    pub target_context: Option<String>,
537}
538
539/// Configuration for late data handling in watermark-based windowing.
540pub struct LateDataConfig {
541    /// How much lateness to tolerate beyond the watermark.
542    /// Events arriving within this window after watermark advancement are still processed.
543    pub allowed_lateness: chrono::Duration,
544    /// Optional stream name to route late events that exceed allowed_lateness.
545    /// If None, late events are dropped.
546    pub side_output_stream: Option<String>,
547}