Skip to main content

varpulis_runtime/engine/
types.rs

1//! Type definitions for the Varpulis engine
2//!
3//! This module contains all the structs, enums and type definitions used by the engine.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::FxHashMap;
10use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
11use varpulis_core::Value;
12
13use crate::aggregation::Aggregator;
14use crate::event::SharedEvent;
15use crate::join::JoinBuffer;
16use crate::sase::SaseEngine;
17use crate::window::{
18    CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
19    SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
20};
21
22// =============================================================================
23// Public Types (exported from crate)
24// =============================================================================
25
26/// Configuration block parsed from VPL
27#[derive(Debug, Clone)]
28pub struct EngineConfig {
29    pub name: String,
30    pub values: HashMap<String, ConfigValue>,
31}
32
33/// User-defined function
34#[derive(Debug, Clone)]
35pub struct UserFunction {
36    pub name: String,
37    pub params: Vec<(String, varpulis_core::Type)>, // (name, type)
38    pub return_type: Option<varpulis_core::Type>,
39    pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
40}
41
42/// Engine performance metrics
43#[derive(Debug, Clone)]
44pub struct EngineMetrics {
45    pub events_processed: u64,
46    pub output_events_emitted: u64,
47    pub streams_count: usize,
48}
49
50/// Report from a hot reload operation
51#[derive(Debug, Clone, Default)]
52pub struct ReloadReport {
53    /// Streams that were added
54    pub streams_added: Vec<String>,
55    /// Streams that were removed
56    pub streams_removed: Vec<String>,
57    /// Streams that were updated (definition changed)
58    pub streams_updated: Vec<String>,
59    /// Streams whose state was preserved (windows, aggregators)
60    pub state_preserved: Vec<String>,
61    /// Streams whose state had to be reset
62    pub state_reset: Vec<String>,
63    /// Non-fatal warnings during reload
64    pub warnings: Vec<String>,
65}
66
67impl ReloadReport {
68    pub const fn is_empty(&self) -> bool {
69        self.streams_added.is_empty()
70            && self.streams_removed.is_empty()
71            && self.streams_updated.is_empty()
72    }
73}
74
75/// Source connector binding from .from() declarations
76#[derive(Debug, Clone)]
77pub struct SourceBinding {
78    pub connector_name: String,
79    pub event_type: String,
80    pub topic_override: Option<String>,
81    /// Extra parameters from .from() (e.g., client_id, qos) — excludes topic
82    pub extra_params: HashMap<String, String>,
83}
84
85/// Named SASE+ pattern definition
86#[derive(Debug, Clone)]
87pub struct NamedPattern {
88    /// Pattern name
89    pub name: String,
90    /// SASE+ pattern expression (SEQ, AND, OR, NOT)
91    pub expr: SasePatternExpr,
92    /// Optional time constraint
93    pub within: Option<Expr>,
94    /// Optional partition key expression
95    pub partition_by: Option<Expr>,
96}
97
98// =============================================================================
99// Internal Types (crate-visible)
100// =============================================================================
101
102/// Runtime stream definition
103#[allow(dead_code)]
104pub struct StreamDefinition {
105    pub name: String,
106    /// Cached `Arc<str>` of the stream name — avoids repeated `String→Arc<str>` conversions
107    /// in the hot path (output event renaming, emit/sequence event construction).
108    pub name_arc: Arc<str>,
109    pub source: RuntimeSource,
110    pub operations: Vec<RuntimeOp>,
111    /// SASE+ pattern matching engine (NFA-based, primary engine for sequences)
112    pub sase_engine: Option<SaseEngine>,
113    /// Join buffer for correlating events from multiple sources
114    pub join_buffer: Option<JoinBuffer>,
115    /// Mapping from event type to join source name (for join streams)
116    pub event_type_to_source: FxHashMap<String, String>,
117    /// Hamlet aggregator for trend aggregation mode
118    pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
119    /// Shared Hamlet aggregator reference (when multi-query sharing is active)
120    pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
121    /// PST-based pattern forecaster (when .forecast() is used)
122    pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
123    /// Last raw event that entered the pipeline (used by Forecast op to learn
124    /// from every event even when the Sequence op clears current_events).
125    pub last_raw_event: Option<crate::event::SharedEvent>,
126    /// Enrichment provider + cache (when .enrich() is used, async-runtime only)
127    #[cfg(feature = "async-runtime")]
128    pub enrichment: Option<(
129        Arc<dyn crate::enrichment::EnrichmentProvider>,
130        Arc<crate::enrichment::EnrichmentCache>,
131    )>,
132    /// Optional backpressure buffer configuration for this stream's input channel (async-runtime only)
133    #[cfg(feature = "async-runtime")]
134    #[allow(dead_code)]
135    pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
136}
137
138/// Source of events for a stream
139pub enum RuntimeSource {
140    EventType(String),
141    Stream(String),
142    Join(Vec<String>),
143    /// Merge multiple event types with optional filters
144    Merge(Vec<MergeSource>),
145    /// Periodic timer source
146    Timer(TimerConfig),
147}
148
149impl RuntimeSource {
150    /// Get a description of this source for logging
151    pub fn describe(&self) -> String {
152        match self {
153            Self::EventType(t) => format!("event:{t}"),
154            Self::Stream(s) => format!("stream:{s}"),
155            Self::Join(sources) => format!("join:{}", sources.join(",")),
156            Self::Merge(sources) => format!("merge:{} sources", sources.len()),
157            Self::Timer(config) => {
158                format!("timer:{}ms", config.interval_ns / 1_000_000)
159            }
160        }
161    }
162}
163
164/// Configuration for a periodic timer source
165pub struct TimerConfig {
166    /// Interval between timer fires in nanoseconds
167    pub interval_ns: u64,
168    /// Optional initial delay before first fire in nanoseconds
169    pub initial_delay_ns: Option<u64>,
170    /// Event type name for timer events (e.g., "Timer_MyStream")
171    pub timer_event_type: String,
172}
173
174/// A source in a merge construct with optional filter
175#[allow(dead_code)]
176pub struct MergeSource {
177    pub name: String,
178    pub event_type: String,
179    pub filter: Option<varpulis_core::ast::Expr>,
180}
181
182/// Runtime operations that can be applied to a stream
183#[allow(dead_code)]
184pub enum RuntimeOp {
185    /// Filter with closure (for sequence filters with context)
186    WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
187    /// Filter with expression (evaluated at runtime with user functions)
188    WhereExpr(varpulis_core::ast::Expr),
189    /// Window with optional partition support
190    Window(WindowType),
191    /// Partitioned window - maintains separate windows per partition key (tumbling)
192    PartitionedWindow(PartitionedWindowState),
193    /// Partitioned sliding count window - maintains separate sliding windows per partition key
194    PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
195    Aggregate(Aggregator),
196    /// Partitioned aggregate - maintains separate aggregators per partition key
197    PartitionedAggregate(PartitionedAggregatorState),
198    /// Having filter - filter aggregation results (post-aggregate filtering)
199    Having(varpulis_core::ast::Expr),
200    /// Select/projection with computed fields
201    Select(SelectConfig),
202    Emit(EmitConfig),
203    /// Emit with expression evaluation for computed fields
204    EmitExpr(EmitExprConfig),
205    /// Print to stdout
206    Print(PrintConfig),
207    /// Log with level
208    Log(LogConfig),
209    /// Sequence operation - index into sequence_tracker steps
210    Sequence,
211    /// Pattern matching with lambda expression
212    Pattern(PatternConfig),
213    /// Process with expression: `.process(expr)` - evaluates for side effects (emit)
214    Process(varpulis_core::ast::Expr),
215    /// Send to connector: `.to(ConnectorName)`
216    To(ToConfig),
217    /// Trend aggregation via Hamlet engine (replaces Sequence for this stream)
218    TrendAggregate(TrendAggregateConfig),
219    /// ONNX model scoring: extract input fields, run inference, add output fields
220    #[allow(dead_code)]
221    Score(ScoreConfig),
222    /// PST-based pattern forecasting: predict pattern completion probability and time
223    Forecast(ForecastConfig),
224    /// External connector enrichment: lookup reference data and inject fields
225    Enrich(EnrichConfig),
226    /// Alert notification: send webhook with event data as side-effect
227    Alert(AlertConfig),
228    /// Deduplicate events by expression value (or entire event if None)
229    Distinct(DistinctState),
230    /// Pass at most N events, then stop the stream
231    Limit(LimitState),
232    /// Parallel processing: partition events across a Rayon thread pool (async-runtime only)
233    #[cfg(feature = "async-runtime")]
234    Concurrent(ConcurrentConfig),
235}
236
237impl RuntimeOp {
238    /// Return a short human-readable name for this operation.
239    ///
240    /// Used by the topology builder to create operation summaries.
241    pub const fn summary_name(&self) -> &'static str {
242        match self {
243            Self::WhereClosure(_) => "Filter",
244            Self::WhereExpr(_) => "Filter",
245            Self::Window(_) => "Window",
246            Self::PartitionedWindow(_) => "PartitionedWindow",
247            Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
248            Self::Aggregate(_) => "Aggregate",
249            Self::PartitionedAggregate(_) => "PartitionedAggregate",
250            Self::Having(_) => "Having",
251            Self::Select(_) => "Select",
252            Self::Emit(_) => "Emit",
253            Self::EmitExpr(_) => "EmitExpr",
254            Self::Print(_) => "Print",
255            Self::Log(_) => "Log",
256            Self::Sequence => "Sequence",
257            Self::Pattern(_) => "Pattern",
258            Self::Process(_) => "Process",
259            Self::To(_) => "Sink",
260            Self::TrendAggregate(_) => "TrendAggregate",
261            Self::Score(_) => "Score",
262            Self::Forecast(_) => "Forecast",
263            Self::Enrich(_) => "Enrich",
264            Self::Alert(_) => "Alert",
265            Self::Distinct(_) => "Distinct",
266            Self::Limit(_) => "Limit",
267            #[cfg(feature = "async-runtime")]
268            Self::Concurrent(_) => "Concurrent",
269        }
270    }
271}
272
273/// Configuration for `.concurrent()` parallel processing.
274///
275/// Production-ready, opt-in via `.concurrent()` in VPL.  Creates a rayon
276/// thread pool that partitions events across workers by key or round-robin.
277/// Only available with async-runtime (requires rayon).
278#[cfg(feature = "async-runtime")]
279pub struct ConcurrentConfig {
280    pub workers: usize,
281    pub partition_key: Option<String>,
282    pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
283}
284
285/// Configuration for trend aggregation via Hamlet engine
286pub struct TrendAggregateConfig {
287    /// Fields to compute: (output_alias, aggregate_function)
288    pub fields: Vec<(String, crate::greta::GretaAggregate)>,
289    /// Query ID in the Hamlet aggregator
290    pub query_id: crate::greta::QueryId,
291    /// Field-based aggregate info for runtime computation (sum/avg/min/max)
292    pub field_aggregates: Vec<FieldAggregateInfo>,
293    /// Maps type index (u16) to event type name (for count_events resolution)
294    pub type_index_to_name: Vec<String>,
295    /// Accumulated events across invocations (persists between execute_op calls)
296    pub accumulated: Vec<SharedEvent>,
297}
298
299/// Info for computing field-based aggregates at runtime.
300/// Populated at compile time from `sum_trends(alias.field)` etc.
301pub struct FieldAggregateInfo {
302    /// Output alias in the emitted event (e.g., "total")
303    pub output_alias: String,
304    /// Aggregate function name: "sum", "avg", "min", "max"
305    pub func: String,
306    /// Event type to filter on (e.g., "sensor_reading")
307    pub event_type: String,
308    /// Field name to aggregate (e.g., "temperature")
309    pub field_name: String,
310}
311
312/// Configuration for PST-based pattern forecasting
313#[allow(dead_code)]
314pub struct ForecastConfig {
315    /// Minimum probability to emit forecast events.
316    pub confidence_threshold: f64,
317    /// Forecast horizon in nanoseconds.
318    pub horizon_ns: u64,
319    /// Events before forecasting starts.
320    pub warmup_events: u64,
321    /// Maximum PST context depth.
322    pub max_depth: usize,
323    /// Whether Hawkes intensity modulation is enabled.
324    pub hawkes: bool,
325    /// Whether conformal prediction intervals are enabled.
326    pub conformal: bool,
327}
328
329/// Configuration for `.alert()` webhook notifications
330pub struct AlertConfig {
331    /// Webhook URL to POST alert payload
332    pub webhook_url: Option<String>,
333    /// Optional message template with `{field}` interpolation
334    pub message_template: Option<String>,
335}
336
337/// Configuration for external connector enrichment
338#[allow(dead_code)]
339pub struct EnrichConfig {
340    /// Name of the connector to use for lookups
341    pub connector_name: String,
342    /// Expression to evaluate as the lookup key
343    pub key_expr: varpulis_core::ast::Expr,
344    /// Fields to extract from the enrichment response
345    pub fields: Vec<String>,
346    /// Cache TTL in nanoseconds (None = no caching)
347    pub cache_ttl_ns: Option<u64>,
348    /// Timeout in nanoseconds (default 5s)
349    pub timeout_ns: u64,
350    /// Fallback value when lookup fails (None = skip event)
351    pub fallback: Option<varpulis_core::Value>,
352}
353
354/// Configuration for ONNX model scoring
355#[allow(dead_code)]
356pub struct ScoreConfig {
357    #[cfg(feature = "onnx")]
358    pub model: std::sync::Arc<crate::scoring::OnnxModel>,
359    pub input_fields: Vec<String>,
360    pub output_fields: Vec<String>,
361    pub batch_size: usize,
362}
363
364/// Topic specification for `.to()` operations — static or dynamic
365#[allow(dead_code)]
366pub enum TopicSpec {
367    /// Static topic string resolved at compile time (current behavior)
368    Static(String),
369    /// Dynamic topic resolved per event from an expression
370    Dynamic(varpulis_core::ast::Expr),
371}
372
373/// Configuration for .to() connector routing
374#[allow(dead_code)]
375pub struct ToConfig {
376    pub connector_name: String,
377    /// Topic specification: static string, dynamic expression, or None (use connector default)
378    pub topic: Option<TopicSpec>,
379    /// Cache key for sink lookup (connector_name or connector_name::topic)
380    pub sink_key: String,
381    /// Extra parameters from .to() (e.g., client_id, qos) — excludes topic
382    #[allow(dead_code)]
383    pub extra_params: HashMap<String, String>,
384}
385
386/// Default capacity for distinct state LRU cache
387pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
388
389/// State for .distinct() — tracks seen values to deduplicate events
390pub struct DistinctState {
391    /// Optional expression to evaluate for distinct key; None = entire event
392    pub expr: Option<varpulis_core::ast::Expr>,
393    /// LRU cache of seen value representations (bounded to prevent unbounded growth)
394    pub seen: hashlink::LruCache<String, ()>,
395}
396
397/// State for .limit(n) — passes at most `max` events
398pub struct LimitState {
399    pub max: usize,
400    pub count: usize,
401}
402
403/// Configuration for select/projection operation
404pub struct SelectConfig {
405    /// Fields to include: (output_name, expression)
406    pub fields: Vec<(String, varpulis_core::ast::Expr)>,
407}
408
409/// Result of processing a stream
410pub struct StreamProcessResult {
411    /// Events produced by .emit() — sent to output channel AND downstream
412    pub emitted_events: Vec<SharedEvent>,
413    /// Output events to feed to dependent streams (with stream name as event_type)
414    pub output_events: Vec<SharedEvent>,
415    /// Number of events sent to connector sinks via .to() operations
416    pub sink_events_sent: u64,
417}
418
419/// State for partitioned windows - maintains separate windows per partition key
420pub struct PartitionedWindowState {
421    pub partition_key: String,
422    pub window_size: usize, // For count-based windows
423    pub windows: FxHashMap<String, CountWindow>,
424}
425
426impl PartitionedWindowState {
427    pub fn new(partition_key: String, window_size: usize) -> Self {
428        Self {
429            partition_key,
430            window_size,
431            windows: FxHashMap::default(),
432        }
433    }
434
435    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
436        let key = event.get(&self.partition_key).map_or_else(
437            || "default".to_string(),
438            |v| v.to_partition_key().into_owned(),
439        );
440
441        let window = self
442            .windows
443            .entry(key)
444            .or_insert_with(|| CountWindow::new(self.window_size));
445
446        window.add_shared(event)
447    }
448}
449
450/// State for partitioned sliding count windows - maintains separate sliding windows per partition key
451pub struct PartitionedSlidingCountWindowState {
452    pub partition_key: String,
453    pub window_size: usize,
454    pub slide_size: usize,
455    pub windows: FxHashMap<String, SlidingCountWindow>,
456}
457
458impl PartitionedSlidingCountWindowState {
459    pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
460        Self {
461            partition_key,
462            window_size,
463            slide_size,
464            windows: FxHashMap::default(),
465        }
466    }
467
468    pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
469        let key = event.get(&self.partition_key).map_or_else(
470            || "default".to_string(),
471            |v| v.to_partition_key().into_owned(),
472        );
473
474        let window = self
475            .windows
476            .entry(key)
477            .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
478
479        window.add_shared(event)
480    }
481}
482
483/// State for partitioned aggregators
484pub struct PartitionedAggregatorState {
485    pub partition_key: String,
486    pub aggregator_template: Aggregator,
487}
488
489impl PartitionedAggregatorState {
490    pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
491        Self {
492            partition_key,
493            aggregator_template: aggregator,
494        }
495    }
496
497    pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
498        // Group events by partition key - use Arc::clone to avoid deep clones
499        let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
500
501        for event in events {
502            let key = event.get(&self.partition_key).map_or_else(
503                || "default".to_string(),
504                |v| v.to_partition_key().into_owned(),
505            );
506            partitions.entry(key).or_default().push(Arc::clone(event));
507        }
508
509        // Apply aggregator to each partition using apply_shared
510        let mut results = Vec::new();
511        for (key, partition_events) in partitions {
512            let result = self.aggregator_template.apply_shared(&partition_events);
513            results.push((key, result));
514        }
515
516        results
517    }
518}
519
520/// Configuration for pattern matching
521#[allow(dead_code)]
522pub struct PatternConfig {
523    pub name: String,
524    pub matcher: varpulis_core::ast::Expr,
525}
526
527/// Configuration for print operation
528pub struct PrintConfig {
529    pub exprs: Vec<varpulis_core::ast::Expr>,
530}
531
532/// Configuration for log operation
533pub struct LogConfig {
534    pub level: String,
535    pub message: Option<String>,
536    pub data_field: Option<String>,
537}
538
539/// Types of windows supported
540pub enum WindowType {
541    Tumbling(TumblingWindow),
542    Sliding(SlidingWindow),
543    Count(CountWindow),
544    SlidingCount(SlidingCountWindow),
545    PartitionedTumbling(PartitionedTumblingWindow),
546    PartitionedSliding(PartitionedSlidingWindow),
547    Session(SessionWindow),
548    PartitionedSession(PartitionedSessionWindow),
549}
550
551/// Configuration for simple emit operation
552#[allow(dead_code)]
553pub struct EmitConfig {
554    pub fields: Vec<(String, String)>, // (output_name, source_field or literal)
555    pub target_context: Option<String>,
556}
557
558/// Configuration for emit with expressions
559#[allow(dead_code)]
560pub struct EmitExprConfig {
561    pub fields: Vec<(String, varpulis_core::ast::Expr)>, // (output_name, expression)
562    pub target_context: Option<String>,
563}
564
565/// Configuration for late data handling in watermark-based windowing.
566#[allow(dead_code)]
567pub struct LateDataConfig {
568    /// How much lateness to tolerate beyond the watermark.
569    /// Events arriving within this window after watermark advancement are still processed.
570    pub allowed_lateness: chrono::Duration,
571    /// Optional stream name to route late events that exceed allowed_lateness.
572    /// If None, late events are dropped.
573    pub side_output_stream: Option<String>,
574}