Skip to main content

varpulis_runtime/engine/
types.rs

1//! Type definitions for the Varpulis engine
2//!
3//! This module contains all the structs, enums and type definitions used by the engine.
4
5use crate::aggregation::Aggregator;
6use crate::event::SharedEvent;
7use crate::join::JoinBuffer;
8use crate::sase::SaseEngine;
9use crate::window::{
10    CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
11    SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
12};
13use indexmap::IndexMap;
14use rustc_hash::FxHashMap;
15use std::collections::HashMap;
16use std::sync::Arc;
17use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
18use varpulis_core::Value;
19
20// =============================================================================
21// Public Types (exported from crate)
22// =============================================================================
23
24/// Configuration block parsed from VPL
25#[derive(Debug, Clone)]
26pub struct EngineConfig {
27    pub name: String,
28    pub values: HashMap<String, ConfigValue>,
29}
30
31/// User-defined function
32#[derive(Debug, Clone)]
33pub struct UserFunction {
34    pub name: String,
35    pub params: Vec<(String, varpulis_core::Type)>, // (name, type)
36    pub return_type: Option<varpulis_core::Type>,
37    pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
38}
39
40/// Engine performance metrics
41#[derive(Debug, Clone)]
42pub struct EngineMetrics {
43    pub events_processed: u64,
44    pub output_events_emitted: u64,
45    pub streams_count: usize,
46}
47
48/// Report from a hot reload operation
49#[derive(Debug, Clone, Default)]
50pub struct ReloadReport {
51    /// Streams that were added
52    pub streams_added: Vec<String>,
53    /// Streams that were removed
54    pub streams_removed: Vec<String>,
55    /// Streams that were updated (definition changed)
56    pub streams_updated: Vec<String>,
57    /// Streams whose state was preserved (windows, aggregators)
58    pub state_preserved: Vec<String>,
59    /// Streams whose state had to be reset
60    pub state_reset: Vec<String>,
61    /// Non-fatal warnings during reload
62    pub warnings: Vec<String>,
63}
64
65impl ReloadReport {
66    pub const fn is_empty(&self) -> bool {
67        self.streams_added.is_empty()
68            && self.streams_removed.is_empty()
69            && self.streams_updated.is_empty()
70    }
71}
72
73/// Source connector binding from .from() declarations
74#[derive(Debug, Clone)]
75pub struct SourceBinding {
76    pub connector_name: String,
77    pub event_type: String,
78    pub topic_override: Option<String>,
79    /// Extra parameters from .from() (e.g., client_id, qos) — excludes topic
80    pub extra_params: HashMap<String, String>,
81}
82
83/// Named SASE+ pattern definition
84#[derive(Debug, Clone)]
85pub struct NamedPattern {
86    /// Pattern name
87    pub name: String,
88    /// SASE+ pattern expression (SEQ, AND, OR, NOT)
89    pub expr: SasePatternExpr,
90    /// Optional time constraint
91    pub within: Option<Expr>,
92    /// Optional partition key expression
93    pub partition_by: Option<Expr>,
94}
95
96// =============================================================================
97// Internal Types (crate-visible)
98// =============================================================================
99
100/// Runtime stream definition
101pub struct StreamDefinition {
102    pub name: String,
103    /// Cached `Arc<str>` of the stream name — avoids repeated `String→Arc<str>` conversions
104    /// in the hot path (output event renaming, emit/sequence event construction).
105    pub name_arc: Arc<str>,
106    pub source: RuntimeSource,
107    pub operations: Vec<RuntimeOp>,
108    /// SASE+ pattern matching engine (NFA-based, primary engine for sequences)
109    pub sase_engine: Option<SaseEngine>,
110    /// Join buffer for correlating events from multiple sources
111    pub join_buffer: Option<JoinBuffer>,
112    /// Mapping from event type to join source name (for join streams)
113    pub event_type_to_source: FxHashMap<String, String>,
114    /// Hamlet aggregator for trend aggregation mode
115    pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
116    /// Shared Hamlet aggregator reference (when multi-query sharing is active)
117    pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
118    /// PST-based pattern forecaster (when .forecast() is used)
119    pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
120    /// Last raw event that entered the pipeline (used by Forecast op to learn
121    /// from every event even when the Sequence op clears current_events).
122    pub last_raw_event: Option<crate::event::SharedEvent>,
123    /// Enrichment provider + cache (when .enrich() is used)
124    pub enrichment: Option<(
125        Arc<dyn crate::enrichment::EnrichmentProvider>,
126        Arc<crate::enrichment::EnrichmentCache>,
127    )>,
128    /// Optional backpressure buffer configuration for this stream's input channel
129    #[allow(dead_code)]
130    pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
131}
132
133/// Source of events for a stream
134pub enum RuntimeSource {
135    EventType(String),
136    Stream(String),
137    Join(Vec<String>),
138    /// Merge multiple event types with optional filters
139    Merge(Vec<MergeSource>),
140    /// Periodic timer source
141    Timer(TimerConfig),
142}
143
144impl RuntimeSource {
145    /// Get a description of this source for logging
146    pub fn describe(&self) -> String {
147        match self {
148            Self::EventType(t) => format!("event:{t}"),
149            Self::Stream(s) => format!("stream:{s}"),
150            Self::Join(sources) => format!("join:{}", sources.join(",")),
151            Self::Merge(sources) => format!("merge:{} sources", sources.len()),
152            Self::Timer(config) => {
153                format!("timer:{}ms", config.interval_ns / 1_000_000)
154            }
155        }
156    }
157}
158
159/// Configuration for a periodic timer source
160pub struct TimerConfig {
161    /// Interval between timer fires in nanoseconds
162    pub interval_ns: u64,
163    /// Optional initial delay before first fire in nanoseconds
164    pub initial_delay_ns: Option<u64>,
165    /// Event type name for timer events (e.g., "Timer_MyStream")
166    pub timer_event_type: String,
167}
168
169/// A source in a merge construct with optional filter
170pub struct MergeSource {
171    pub name: String,
172    pub event_type: String,
173    pub filter: Option<varpulis_core::ast::Expr>,
174}
175
176/// Runtime operations that can be applied to a stream
177pub enum RuntimeOp {
178    /// Filter with closure (for sequence filters with context)
179    WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
180    /// Filter with expression (evaluated at runtime with user functions)
181    WhereExpr(varpulis_core::ast::Expr),
182    /// Window with optional partition support
183    Window(WindowType),
184    /// Partitioned window - maintains separate windows per partition key (tumbling)
185    PartitionedWindow(PartitionedWindowState),
186    /// Partitioned sliding count window - maintains separate sliding windows per partition key
187    PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
188    Aggregate(Aggregator),
189    /// Partitioned aggregate - maintains separate aggregators per partition key
190    PartitionedAggregate(PartitionedAggregatorState),
191    /// Having filter - filter aggregation results (post-aggregate filtering)
192    Having(varpulis_core::ast::Expr),
193    /// Select/projection with computed fields
194    Select(SelectConfig),
195    Emit(EmitConfig),
196    /// Emit with expression evaluation for computed fields
197    EmitExpr(EmitExprConfig),
198    /// Print to stdout
199    Print(PrintConfig),
200    /// Log with level
201    Log(LogConfig),
202    /// Sequence operation - index into sequence_tracker steps
203    Sequence,
204    /// Pattern matching with lambda expression
205    Pattern(PatternConfig),
206    /// Process with expression: `.process(expr)` - evaluates for side effects (emit)
207    Process(varpulis_core::ast::Expr),
208    /// Send to connector: `.to(ConnectorName)`
209    To(ToConfig),
210    /// Trend aggregation via Hamlet engine (replaces Sequence for this stream)
211    TrendAggregate(TrendAggregateConfig),
212    /// ONNX model scoring: extract input fields, run inference, add output fields
213    #[allow(dead_code)]
214    Score(ScoreConfig),
215    /// PST-based pattern forecasting: predict pattern completion probability and time
216    Forecast(ForecastConfig),
217    /// External connector enrichment: lookup reference data and inject fields
218    Enrich(EnrichConfig),
219    /// Deduplicate events by expression value (or entire event if None)
220    Distinct(DistinctState),
221    /// Pass at most N events, then stop the stream
222    Limit(LimitState),
223    /// Parallel processing: partition events across a Rayon thread pool
224    Concurrent(ConcurrentConfig),
225}
226
227impl RuntimeOp {
228    /// Return a short human-readable name for this operation.
229    ///
230    /// Used by the topology builder to create operation summaries.
231    pub const fn summary_name(&self) -> &'static str {
232        match self {
233            Self::WhereClosure(_) => "Filter",
234            Self::WhereExpr(_) => "Filter",
235            Self::Window(_) => "Window",
236            Self::PartitionedWindow(_) => "PartitionedWindow",
237            Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
238            Self::Aggregate(_) => "Aggregate",
239            Self::PartitionedAggregate(_) => "PartitionedAggregate",
240            Self::Having(_) => "Having",
241            Self::Select(_) => "Select",
242            Self::Emit(_) => "Emit",
243            Self::EmitExpr(_) => "EmitExpr",
244            Self::Print(_) => "Print",
245            Self::Log(_) => "Log",
246            Self::Sequence => "Sequence",
247            Self::Pattern(_) => "Pattern",
248            Self::Process(_) => "Process",
249            Self::To(_) => "Sink",
250            Self::TrendAggregate(_) => "TrendAggregate",
251            Self::Score(_) => "Score",
252            Self::Forecast(_) => "Forecast",
253            Self::Enrich(_) => "Enrich",
254            Self::Distinct(_) => "Distinct",
255            Self::Limit(_) => "Limit",
256            Self::Concurrent(_) => "Concurrent",
257        }
258    }
259}
260
261/// Configuration for `.concurrent()` parallel processing
262pub struct ConcurrentConfig {
263    pub workers: usize,
264    pub partition_key: Option<String>,
265    pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
266}
267
268/// Configuration for trend aggregation via Hamlet engine
269pub struct TrendAggregateConfig {
270    /// Fields to compute: (output_alias, aggregate_function)
271    pub fields: Vec<(String, crate::greta::GretaAggregate)>,
272    /// Query ID in the Hamlet aggregator
273    pub query_id: crate::greta::QueryId,
274}
275
276/// Configuration for PST-based pattern forecasting
277#[allow(dead_code)]
278pub struct ForecastConfig {
279    /// Minimum probability to emit forecast events.
280    pub confidence_threshold: f64,
281    /// Forecast horizon in nanoseconds.
282    pub horizon_ns: u64,
283    /// Events before forecasting starts.
284    pub warmup_events: u64,
285    /// Maximum PST context depth.
286    pub max_depth: usize,
287    /// Whether Hawkes intensity modulation is enabled.
288    pub hawkes: bool,
289    /// Whether conformal prediction intervals are enabled.
290    pub conformal: bool,
291}
292
293/// Configuration for external connector enrichment
294pub struct EnrichConfig {
295    /// Name of the connector to use for lookups
296    pub connector_name: String,
297    /// Expression to evaluate as the lookup key
298    pub key_expr: varpulis_core::ast::Expr,
299    /// Fields to extract from the enrichment response
300    pub fields: Vec<String>,
301    /// Cache TTL in nanoseconds (None = no caching)
302    pub cache_ttl_ns: Option<u64>,
303    /// Timeout in nanoseconds (default 5s)
304    pub timeout_ns: u64,
305    /// Fallback value when lookup fails (None = skip event)
306    pub fallback: Option<varpulis_core::Value>,
307}
308
309/// Configuration for ONNX model scoring
310#[allow(dead_code)]
311pub struct ScoreConfig {
312    #[cfg(feature = "onnx")]
313    pub model: std::sync::Arc<crate::scoring::OnnxModel>,
314    pub input_fields: Vec<String>,
315    pub output_fields: Vec<String>,
316    pub batch_size: usize,
317}
318
319/// Configuration for .to() connector routing
320pub struct ToConfig {
321    pub connector_name: String,
322    /// Topic override from .to() params (e.g., `.to(Conn, topic: "my-topic")`)
323    pub topic_override: Option<String>,
324    /// Cache key for sink lookup (connector_name or connector_name::topic)
325    pub sink_key: String,
326    /// Extra parameters from .to() (e.g., client_id, qos) — excludes topic
327    #[allow(dead_code)]
328    pub extra_params: HashMap<String, String>,
329}
330
331/// Default capacity for distinct state LRU cache
332pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
333
334/// State for .distinct() — tracks seen values to deduplicate events
335pub struct DistinctState {
336    /// Optional expression to evaluate for distinct key; None = entire event
337    pub expr: Option<varpulis_core::ast::Expr>,
338    /// LRU cache of seen value representations (bounded to prevent unbounded growth)
339    pub seen: hashlink::LruCache<String, ()>,
340}
341
342/// State for .limit(n) — passes at most `max` events
343pub struct LimitState {
344    pub max: usize,
345    pub count: usize,
346}
347
348/// Configuration for select/projection operation
349pub struct SelectConfig {
350    /// Fields to include: (output_name, expression)
351    pub fields: Vec<(String, varpulis_core::ast::Expr)>,
352}
353
354/// Result of processing a stream
355pub struct StreamProcessResult {
356    /// Events produced by .emit() — sent to output channel AND downstream
357    pub emitted_events: Vec<SharedEvent>,
358    /// Output events to feed to dependent streams (with stream name as event_type)
359    pub output_events: Vec<SharedEvent>,
360    /// Number of events sent to connector sinks via .to() operations
361    pub sink_events_sent: u64,
362}
363
364/// State for partitioned windows - maintains separate windows per partition key
365pub struct PartitionedWindowState {
366    pub partition_key: String,
367    pub window_size: usize, // For count-based windows
368    pub windows: FxHashMap<String, CountWindow>,
369}
370
371impl PartitionedWindowState {
372    pub fn new(partition_key: String, window_size: usize) -> Self {
373        Self {
374            partition_key,
375            window_size,
376            windows: FxHashMap::default(),
377        }
378    }
379
380    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
381        let key = event.get(&self.partition_key).map_or_else(
382            || "default".to_string(),
383            |v| v.to_partition_key().into_owned(),
384        );
385
386        let window = self
387            .windows
388            .entry(key)
389            .or_insert_with(|| CountWindow::new(self.window_size));
390
391        window.add_shared(event)
392    }
393}
394
395/// State for partitioned sliding count windows - maintains separate sliding windows per partition key
396pub struct PartitionedSlidingCountWindowState {
397    pub partition_key: String,
398    pub window_size: usize,
399    pub slide_size: usize,
400    pub windows: FxHashMap<String, SlidingCountWindow>,
401}
402
403impl PartitionedSlidingCountWindowState {
404    pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
405        Self {
406            partition_key,
407            window_size,
408            slide_size,
409            windows: FxHashMap::default(),
410        }
411    }
412
413    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
414        let key = event.get(&self.partition_key).map_or_else(
415            || "default".to_string(),
416            |v| v.to_partition_key().into_owned(),
417        );
418
419        let window = self
420            .windows
421            .entry(key)
422            .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
423
424        window.add_shared(event)
425    }
426}
427
428/// State for partitioned aggregators
429pub struct PartitionedAggregatorState {
430    pub partition_key: String,
431    pub aggregator_template: Aggregator,
432}
433
434impl PartitionedAggregatorState {
435    pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
436        Self {
437            partition_key,
438            aggregator_template: aggregator,
439        }
440    }
441
442    pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
443        // Group events by partition key - use Arc::clone to avoid deep clones
444        let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
445
446        for event in events {
447            let key = event.get(&self.partition_key).map_or_else(
448                || "default".to_string(),
449                |v| v.to_partition_key().into_owned(),
450            );
451            partitions.entry(key).or_default().push(Arc::clone(event));
452        }
453
454        // Apply aggregator to each partition using apply_shared
455        let mut results = Vec::new();
456        for (key, partition_events) in partitions {
457            let result = self.aggregator_template.apply_shared(&partition_events);
458            results.push((key, result));
459        }
460
461        results
462    }
463}
464
465/// Configuration for pattern matching
466#[allow(dead_code)]
467pub struct PatternConfig {
468    pub name: String,
469    pub matcher: varpulis_core::ast::Expr,
470}
471
472/// Configuration for print operation
473pub struct PrintConfig {
474    pub exprs: Vec<varpulis_core::ast::Expr>,
475}
476
477/// Configuration for log operation
478pub struct LogConfig {
479    pub level: String,
480    pub message: Option<String>,
481    pub data_field: Option<String>,
482}
483
484/// Types of windows supported
485pub enum WindowType {
486    Tumbling(TumblingWindow),
487    Sliding(SlidingWindow),
488    Count(CountWindow),
489    SlidingCount(SlidingCountWindow),
490    PartitionedTumbling(PartitionedTumblingWindow),
491    PartitionedSliding(PartitionedSlidingWindow),
492    Session(SessionWindow),
493    PartitionedSession(PartitionedSessionWindow),
494}
495
496/// Configuration for simple emit operation
497#[allow(dead_code)]
498pub struct EmitConfig {
499    pub fields: Vec<(String, String)>, // (output_name, source_field or literal)
500    pub target_context: Option<String>,
501}
502
503/// Configuration for emit with expressions
504#[allow(dead_code)]
505pub struct EmitExprConfig {
506    pub fields: Vec<(String, varpulis_core::ast::Expr)>, // (output_name, expression)
507    pub target_context: Option<String>,
508}
509
510/// Configuration for late data handling in watermark-based windowing.
511pub struct LateDataConfig {
512    /// How much lateness to tolerate beyond the watermark.
513    /// Events arriving within this window after watermark advancement are still processed.
514    pub allowed_lateness: chrono::Duration,
515    /// Optional stream name to route late events that exceed allowed_lateness.
516    /// If None, late events are dropped.
517    pub side_output_stream: Option<String>,
518}