Skip to main content

varpulis_runtime/engine/
mod.rs

1//! Main execution engine for Varpulis
2//!
3//! This module provides the core engine that processes events and executes
4//! stream definitions written in VPL.
5
6#[cfg(feature = "async-runtime")]
7mod builder;
8mod compilation;
9pub mod compiler;
10mod dispatch;
11pub mod error;
12pub mod evaluator;
13pub mod graph;
14mod pattern_analyzer;
15pub mod physical_plan;
16mod pipeline;
17pub mod planner;
18mod router;
19#[cfg(feature = "async-runtime")]
20mod sink_factory;
21pub mod topology;
22pub mod topology_builder;
23pub mod trace;
24mod types;
25
26// Re-export public types
27use std::sync::Arc;
28
29#[cfg(feature = "async-runtime")]
30pub use builder::EngineBuilder;
31use chrono::{DateTime, Duration, Utc};
32// Re-export evaluator for use by other modules (e.g., SASE+)
33pub use evaluator::eval_filter_expr;
34use rustc_hash::{FxHashMap, FxHashSet};
35#[cfg(feature = "async-runtime")]
36pub use sink_factory::SinkConnectorAdapter;
37#[cfg(feature = "async-runtime")]
38use tokio::sync::mpsc;
39use tracing::{info, warn};
40// Re-export NamedPattern from types
41pub use types::NamedPattern;
42pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
43// Re-export internal types for use within the engine module
44use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
45use varpulis_core::ast::{ConfigItem, Program, Stmt};
46use varpulis_core::Value;
47
48#[cfg(feature = "async-runtime")]
49use crate::connector;
50#[cfg(feature = "async-runtime")]
51use crate::context::ContextMap;
52use crate::event::{Event, SharedEvent};
53#[cfg(feature = "async-runtime")]
54use crate::metrics::Metrics;
55use crate::sase_persistence::SaseCheckpointExt;
56use crate::sequence::SequenceContext;
57use crate::udf::UdfRegistry;
58use crate::watermark::PerSourceWatermarkTracker;
59use crate::window::CountWindow;
60
61/// Output channel type enumeration for zero-copy or owned event sending.
62/// Only available with async-runtime (requires tokio mpsc channels).
63#[cfg(feature = "async-runtime")]
64#[derive(Debug)]
65pub(super) enum OutputChannel {
66    /// Legacy channel that requires cloning (for backwards compatibility)
67    Owned(mpsc::Sender<Event>),
68    /// Zero-copy channel using SharedEvent (Arc<Event>)
69    Shared(mpsc::Sender<SharedEvent>),
70}
71
72/// The main Varpulis engine
73pub struct Engine {
74    /// Registered stream definitions
75    pub(super) streams: FxHashMap<String, StreamDefinition>,
76    /// Event routing: maps event types to stream names
77    pub(super) router: router::EventRouter,
78    /// User-defined functions
79    pub(super) functions: FxHashMap<String, UserFunction>,
80    /// Named patterns for reuse
81    pub(super) patterns: FxHashMap<String, NamedPattern>,
82    /// Configuration blocks (e.g., mqtt, kafka)
83    pub(super) configs: FxHashMap<String, EngineConfig>,
84    /// Mutable variables accessible across events
85    pub(super) variables: FxHashMap<String, Value>,
86    /// Tracks which variables are declared as mutable (var vs let)
87    pub(super) mutable_vars: FxHashSet<String>,
88    /// Declared connectors from VPL (async-runtime only)
89    #[cfg(feature = "async-runtime")]
90    pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
91    /// Source connector bindings from .from() declarations
92    pub(super) source_bindings: Vec<SourceBinding>,
93    /// Sink registry for .to() operations (async-runtime only)
94    #[cfg(feature = "async-runtime")]
95    pub(super) sinks: sink_factory::SinkRegistry,
96    /// Output event sender (None for benchmark/quiet mode - skips cloning overhead).
97    /// Only available with async-runtime (requires tokio mpsc channels).
98    #[cfg(feature = "async-runtime")]
99    pub(super) output_channel: Option<OutputChannel>,
100    /// Collected output events buffer for sync/WASM mode (used by process_batch_sync_collect).
101    pub(super) collected_outputs: Vec<Event>,
102    /// Metrics
103    pub(super) events_processed: u64,
104    pub(super) output_events_emitted: u64,
105    /// Prometheus metrics (async-runtime only)
106    #[cfg(feature = "async-runtime")]
107    pub(super) metrics: Option<Metrics>,
108    /// Context assignments for multi-threaded execution (async-runtime only)
109    #[cfg(feature = "async-runtime")]
110    pub(super) context_map: ContextMap,
111    /// Per-source watermark tracker for event-time processing
112    pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
113    /// Last applied watermark (for detecting advances)
114    pub(super) last_applied_watermark: Option<DateTime<Utc>>,
115    /// Late data configurations per stream
116    pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
117    /// Context name when running inside a context thread (used for unique connector IDs)
118    pub(super) context_name: Option<String>,
119    /// Topic prefix for multi-tenant Kafka/MQTT isolation (prepended to all topic names)
120    pub(super) topic_prefix: Option<String>,
121    /// Shared Hamlet aggregators for multi-query optimization
122    pub(super) shared_hamlet_aggregators:
123        Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
124    /// Auto-checkpointing manager (None = checkpointing disabled)
125    pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
126    /// Connector credentials store for secure profile resolution (async-runtime only)
127    #[cfg(feature = "async-runtime")]
128    pub(super) credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
129    /// Custom DLQ file path (defaults to "varpulis-dlq.jsonl")
130    pub(super) dlq_path: Option<std::path::PathBuf>,
131    /// DLQ configuration (rotation limits etc.)
132    pub(super) dlq_config: crate::dead_letter::DlqConfig,
133    /// Shared DLQ instance (created during load())
134    pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
135    /// Physical plan snapshot for introspection (built during load_program)
136    pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
137    /// Registry for native Rust UDFs (scalar + aggregate)
138    pub(super) udf_registry: UdfRegistry,
139    /// Pipeline trace collector (zero-cost when disabled)
140    pub(super) trace_collector: trace::TraceCollector,
141}
142
143impl std::fmt::Debug for Engine {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        let mut s = f.debug_struct("Engine");
146        s.field("streams", &self.streams.keys().collect::<Vec<_>>())
147            .field("functions", &self.functions.keys().collect::<Vec<_>>())
148            .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
149            .field("configs", &self.configs.keys().collect::<Vec<_>>());
150        #[cfg(feature = "async-runtime")]
151        s.field("connectors", &self.connectors.keys().collect::<Vec<_>>())
152            .field("context_map", &self.context_map);
153        s.field("events_processed", &self.events_processed)
154            .field("output_events_emitted", &self.output_events_emitted)
155            .field("context_name", &self.context_name)
156            .field("topic_prefix", &self.topic_prefix)
157            .finish_non_exhaustive()
158    }
159}
160
161impl Engine {
162    /// Create an [`EngineBuilder`] for fluent engine construction (async-runtime only).
163    #[cfg(feature = "async-runtime")]
164    pub fn builder() -> EngineBuilder {
165        EngineBuilder::new()
166    }
167
168    /// Create engine with an owned output channel (async-runtime only).
169    #[cfg(feature = "async-runtime")]
170    pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
171        Self::new_internal(Some(OutputChannel::Owned(output_tx)))
172    }
173
174    /// Create engine without output channel (for benchmarking - skips Event cloning overhead).
175    /// Available in both async and WASM builds.
176    pub fn new_benchmark() -> Self {
177        #[cfg(feature = "async-runtime")]
178        {
179            Self::new_internal(None)
180        }
181        #[cfg(not(feature = "async-runtime"))]
182        {
183            Self::new_sync()
184        }
185    }
186
187    /// Create engine with optional output channel (async-runtime only, legacy API).
188    #[cfg(feature = "async-runtime")]
189    pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
190        Self::new_internal(output_tx.map(OutputChannel::Owned))
191    }
192
193    /// Create engine with zero-copy SharedEvent output channel (async-runtime only).
194    #[cfg(feature = "async-runtime")]
195    pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
196        Self::new_internal(Some(OutputChannel::Shared(output_tx)))
197    }
198
199    /// Set the connector credentials store for profile resolution (async-runtime only).
200    #[cfg(feature = "async-runtime")]
201    pub fn set_credentials_store(&mut self, store: Arc<connector::credentials::CredentialsStore>) {
202        self.credentials_store = Some(store);
203    }
204
205    /// Internal constructor for async-runtime builds with output channel support.
206    #[cfg(feature = "async-runtime")]
207    fn new_internal(output_channel: Option<OutputChannel>) -> Self {
208        Self {
209            streams: FxHashMap::default(),
210            router: router::EventRouter::new(),
211            functions: FxHashMap::default(),
212            patterns: FxHashMap::default(),
213            configs: FxHashMap::default(),
214            variables: FxHashMap::default(),
215            mutable_vars: FxHashSet::default(),
216            connectors: FxHashMap::default(),
217            source_bindings: Vec::new(),
218            sinks: sink_factory::SinkRegistry::new(),
219            output_channel,
220            events_processed: 0,
221            output_events_emitted: 0,
222            metrics: None,
223            context_map: ContextMap::new(),
224            watermark_tracker: None,
225            last_applied_watermark: None,
226            late_data_configs: FxHashMap::default(),
227            context_name: None,
228            topic_prefix: None,
229            shared_hamlet_aggregators: Vec::new(),
230            checkpoint_manager: None,
231            credentials_store: None,
232            dlq_path: None,
233            dlq_config: crate::dead_letter::DlqConfig::default(),
234            dlq: None,
235            physical_plan: None,
236            udf_registry: UdfRegistry::new(),
237            trace_collector: trace::TraceCollector::new(),
238            collected_outputs: Vec::new(),
239        }
240    }
241
242    /// Constructor for sync-only / WASM builds (no tokio, no output channel).
243    /// Outputs are collected internally and returned via `process_batch_sync_collect()`.
244    pub fn new_sync() -> Self {
245        #[cfg(feature = "async-runtime")]
246        {
247            let mut engine = Self::new_internal(None);
248            engine.collected_outputs = Vec::new();
249            engine
250        }
251        #[cfg(not(feature = "async-runtime"))]
252        {
253            Self {
254                streams: FxHashMap::default(),
255                router: router::EventRouter::new(),
256                functions: FxHashMap::default(),
257                patterns: FxHashMap::default(),
258                configs: FxHashMap::default(),
259                variables: FxHashMap::default(),
260                mutable_vars: FxHashSet::default(),
261                source_bindings: Vec::new(),
262                collected_outputs: Vec::new(),
263                events_processed: 0,
264                output_events_emitted: 0,
265                watermark_tracker: None,
266                last_applied_watermark: None,
267                late_data_configs: FxHashMap::default(),
268                context_name: None,
269                topic_prefix: None,
270                shared_hamlet_aggregators: Vec::new(),
271                checkpoint_manager: None,
272                dlq_path: None,
273                dlq_config: crate::dead_letter::DlqConfig::default(),
274                dlq: None,
275                physical_plan: None,
276                udf_registry: UdfRegistry::new(),
277                trace_collector: trace::TraceCollector::new(),
278            }
279        }
280    }
281
282    /// Clone the output channel for use in engine reload (async-runtime only).
283    #[cfg(feature = "async-runtime")]
284    fn clone_output_channel(&self) -> Option<OutputChannel> {
285        match &self.output_channel {
286            Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
287            Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
288            None => None,
289        }
290    }
291
292    /// Send an output event to the output channel (if configured).
293    /// In benchmark mode (no output channel), this is a no-op to avoid cloning overhead.
294    /// PERF: Uses zero-copy for SharedEvent channels, clones only for legacy Owned channels.
295    #[cfg(feature = "async-runtime")]
296    #[inline]
297    pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
298        match &self.output_channel {
299            Some(OutputChannel::Shared(tx)) => {
300                // PERF: Zero-copy - just increment Arc refcount
301                if let Err(e) = tx.try_send(Arc::clone(event)) {
302                    warn!("Failed to send output event: {}", e);
303                }
304            }
305            Some(OutputChannel::Owned(tx)) => {
306                // Legacy path: must clone the Event
307                let owned = (**event).clone();
308                if let Err(e) = tx.try_send(owned) {
309                    warn!("Failed to send output event: {}", e);
310                }
311            }
312            None => {
313                // No channel: collect into buffer for sync_collect
314                self.collected_outputs.push((**event).clone());
315            }
316        }
317    }
318
319    /// Collect output event into internal buffer (WASM/sync-only mode).
320    #[cfg(not(feature = "async-runtime"))]
321    #[inline]
322    pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
323        self.collected_outputs.push((**event).clone());
324    }
325
326    /// Send an output event to the output channel (if configured).
327    /// When no channel (benchmark/sync mode), collects into internal buffer.
328    #[cfg(feature = "async-runtime")]
329    #[inline]
330    pub(super) fn send_output(&mut self, event: Event) {
331        match &self.output_channel {
332            Some(OutputChannel::Shared(tx)) => {
333                // Wrap in Arc for shared channel
334                if let Err(e) = tx.try_send(Arc::new(event)) {
335                    warn!("Failed to send output event: {}", e);
336                }
337            }
338            Some(OutputChannel::Owned(tx)) => {
339                if let Err(e) = tx.try_send(event) {
340                    warn!("Failed to send output event: {}", e);
341                }
342            }
343            None => {
344                // No channel: collect into buffer for sync_collect
345                self.collected_outputs.push(event);
346            }
347        }
348    }
349
350    /// Collect output event into internal buffer (WASM/sync-only mode).
351    #[cfg(not(feature = "async-runtime"))]
352    #[inline]
353    #[allow(dead_code)]
354    pub(super) fn send_output(&mut self, event: Event) {
355        self.collected_outputs.push(event);
356    }
357
358    /// Process events synchronously and return collected outputs directly.
359    /// Used by WASM builds and tests where mpsc channels are not needed.
360    pub fn process_batch_sync_collect(
361        &mut self,
362        events: Vec<Event>,
363    ) -> Result<Vec<Event>, error::EngineError> {
364        self.collected_outputs.clear();
365        self.process_batch_sync(events)?;
366        Ok(std::mem::take(&mut self.collected_outputs))
367    }
368
369    /// Set the context name for this engine instance.
370    pub fn set_context_name(&mut self, name: &str) {
371        self.context_name = Some(name.to_string());
372    }
373
374    /// Set a topic prefix for multi-tenant isolation (call before `load()`).
375    ///
376    /// When set, all Kafka and MQTT topic names will be prefixed with
377    /// `{prefix}.` to enforce per-tenant topic isolation.
378    pub fn set_topic_prefix(&mut self, prefix: &str) {
379        self.topic_prefix = Some(prefix.to_string());
380    }
381
382    /// Enable or disable pipeline trace collection.
383    ///
384    /// When enabled, the engine records which streams each event is routed to,
385    /// which operators pass or block the event, SASE pattern state, and emitted
386    /// output events. Use [`drain_trace`](Self::drain_trace) to retrieve entries.
387    pub fn set_trace_enabled(&mut self, enabled: bool) {
388        self.trace_collector.set_enabled(enabled);
389    }
390
391    /// Whether pipeline trace collection is currently enabled.
392    #[inline]
393    pub fn is_trace_enabled(&self) -> bool {
394        self.trace_collector.is_enabled()
395    }
396
397    /// Drain all collected trace entries since the last drain.
398    pub fn drain_trace(&mut self) -> Vec<trace::TraceEntry> {
399        self.trace_collector.drain()
400    }
401
402    /// Set a custom DLQ file path (call before `load()`).
403    pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
404        self.dlq_path = Some(path);
405    }
406
407    /// Set custom DLQ configuration (call before `load()`).
408    pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
409        self.dlq_config = config;
410    }
411
412    /// Access the shared DLQ instance (created during `load()`).
413    pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
414        self.dlq.as_ref()
415    }
416
417    /// Get a named pattern by name
418    pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
419        self.patterns.get(name)
420    }
421
422    /// Get all registered patterns
423    pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
424        &self.patterns
425    }
426
427    /// Get a configuration block by name
428    pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
429        self.configs.get(name)
430    }
431
432    /// Get a declared connector by name (async-runtime only).
433    #[cfg(feature = "async-runtime")]
434    pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
435        self.connectors.get(name)
436    }
437
438    /// Get all declared connector configs (async-runtime only).
439    #[cfg(feature = "async-runtime")]
440    pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
441        &self.connectors
442    }
443
444    /// Get source connector bindings from .from() declarations
445    pub fn source_bindings(&self) -> &[SourceBinding] {
446        &self.source_bindings
447    }
448
449    /// Get a variable value by name
450    pub fn get_variable(&self, name: &str) -> Option<&Value> {
451        self.variables.get(name)
452    }
453
454    /// Set a variable value (must be mutable or new)
455    pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
456        if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
457            return Err(error::EngineError::Compilation(format!(
458                "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
459            )));
460        }
461        self.variables.insert(name.to_string(), value);
462        Ok(())
463    }
464
465    /// Get all variables (for debugging/testing)
466    pub const fn variables(&self) -> &FxHashMap<String, Value> {
467        &self.variables
468    }
469
470    /// Get the context map (for orchestrator setup, async-runtime only).
471    #[cfg(feature = "async-runtime")]
472    pub const fn context_map(&self) -> &ContextMap {
473        &self.context_map
474    }
475
476    /// Check if the loaded program declares any contexts (async-runtime only).
477    #[cfg(feature = "async-runtime")]
478    pub fn has_contexts(&self) -> bool {
479        self.context_map.has_contexts()
480    }
481
482    /// Enable Prometheus metrics (async-runtime only).
483    #[cfg(feature = "async-runtime")]
484    pub fn with_metrics(mut self, metrics: Metrics) -> Self {
485        self.metrics = Some(metrics);
486        self
487    }
488
489    /// Add a programmatic filter to a stream using a closure
490    pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
491    where
492        F: Fn(&Event) -> bool + Send + Sync + 'static,
493    {
494        if let Some(stream) = self.streams.get_mut(stream_name) {
495            let wrapped = move |e: &SharedEvent| filter(e.as_ref());
496            stream
497                .operations
498                .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
499            Ok(())
500        } else {
501            Err(error::EngineError::StreamNotFound(stream_name.to_string()))
502        }
503    }
504
505    /// Load a program into the engine with semantic validation.
506    pub fn load_with_source(
507        &mut self,
508        source: &str,
509        program: &Program,
510    ) -> Result<(), error::EngineError> {
511        let validation = varpulis_core::validate::validate(source, program);
512        if validation.has_errors() {
513            return Err(error::EngineError::Compilation(validation.format(source)));
514        }
515        for warning in validation
516            .diagnostics
517            .iter()
518            .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
519        {
520            warn!("{}", warning.message);
521        }
522        self.load_program(program)
523    }
524
525    /// Load a program into the engine (no semantic validation).
526    pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
527        self.load_program(program)
528    }
529
530    fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
531        for stmt in &program.statements {
532            match &stmt.node {
533                Stmt::StreamDecl {
534                    name, source, ops, ..
535                } => {
536                    self.register_stream(name, source, ops)?;
537                }
538                Stmt::EventDecl { name, fields, .. } => {
539                    info!(
540                        "Registered event type: {} with {} fields",
541                        name,
542                        fields.len()
543                    );
544                }
545                Stmt::FnDecl {
546                    name,
547                    params,
548                    ret,
549                    body,
550                } => {
551                    let user_fn = UserFunction {
552                        name: name.clone(),
553                        params: params
554                            .iter()
555                            .map(|p| (p.name.clone(), p.ty.clone()))
556                            .collect(),
557                        return_type: ret.clone(),
558                        body: body.clone(),
559                    };
560                    info!(
561                        "Registered function: {}({} params)",
562                        name,
563                        user_fn.params.len()
564                    );
565                    self.functions.insert(name.clone(), user_fn);
566                }
567                Stmt::Config { name, items } => {
568                    warn!(
569                        "DEPRECATED: 'config {}' block syntax is deprecated. \
570                         Use 'connector' declarations instead: \
571                         connector MyConn = {} (...)",
572                        name, name
573                    );
574                    let mut values = std::collections::HashMap::new();
575                    for item in items {
576                        if let ConfigItem::Value(key, val) = item {
577                            values.insert(key.clone(), val.clone());
578                        }
579                    }
580                    info!(
581                        "Registered config block: {} with {} items",
582                        name,
583                        values.len()
584                    );
585                    self.configs.insert(
586                        name.clone(),
587                        EngineConfig {
588                            name: name.clone(),
589                            values,
590                        },
591                    );
592                }
593                Stmt::PatternDecl {
594                    name,
595                    expr,
596                    within,
597                    partition_by,
598                } => {
599                    let named_pattern = NamedPattern {
600                        name: name.clone(),
601                        expr: expr.clone(),
602                        within: within.clone(),
603                        partition_by: partition_by.clone(),
604                    };
605                    info!(
606                        "Registered SASE+ pattern: {} (within: {}, partition: {})",
607                        name,
608                        within.is_some(),
609                        partition_by.is_some()
610                    );
611                    self.patterns.insert(name.clone(), named_pattern);
612                }
613                Stmt::Import { path, alias } => {
614                    warn!(
615                        "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
616                        path, alias
617                    );
618                }
619                Stmt::VarDecl {
620                    mutable,
621                    name,
622                    value,
623                    ..
624                } => {
625                    let dummy_event = Event::new("__init__");
626                    let empty_ctx = SequenceContext::new();
627                    let initial_value = evaluator::eval_expr_with_functions(
628                        value,
629                        &dummy_event,
630                        &empty_ctx,
631                        &self.functions,
632                        &self.variables,
633                    )
634                    .ok_or_else(|| {
635                        error::EngineError::Compilation(format!(
636                            "Failed to evaluate initial value for variable '{name}'"
637                        ))
638                    })?;
639
640                    info!(
641                        "Registered {} variable: {} = {:?}",
642                        if *mutable { "mutable" } else { "immutable" },
643                        name,
644                        initial_value
645                    );
646
647                    self.variables.insert(name.clone(), initial_value);
648                    if *mutable {
649                        self.mutable_vars.insert(name.clone());
650                    }
651                }
652                Stmt::Assignment { name, value } => {
653                    let dummy_event = Event::new("__assign__");
654                    let empty_ctx = SequenceContext::new();
655                    let new_value = evaluator::eval_expr_with_functions(
656                        value,
657                        &dummy_event,
658                        &empty_ctx,
659                        &self.functions,
660                        &self.variables,
661                    )
662                    .ok_or_else(|| {
663                        error::EngineError::Compilation(format!(
664                            "Failed to evaluate assignment value for '{name}'"
665                        ))
666                    })?;
667
668                    if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
669                        return Err(error::EngineError::Compilation(format!(
670                            "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
671                        )));
672                    }
673
674                    if !self.variables.contains_key(name) {
675                        self.mutable_vars.insert(name.clone());
676                    }
677
678                    info!("Assigned variable: {} = {:?}", name, new_value);
679                    self.variables.insert(name.clone(), new_value);
680                }
681                #[cfg(feature = "async-runtime")]
682                Stmt::ContextDecl { name, cores } => {
683                    use crate::context::ContextConfig;
684                    info!("Registered context: {} (cores: {:?})", name, cores);
685                    self.context_map.register_context(ContextConfig {
686                        name: name.clone(),
687                        cores: cores.clone(),
688                    });
689                }
690                #[cfg(not(feature = "async-runtime"))]
691                Stmt::ContextDecl { .. } => {
692                    warn!("Context declarations require async-runtime feature; ignoring");
693                }
694                #[cfg(feature = "async-runtime")]
695                Stmt::ConnectorDecl {
696                    name,
697                    connector_type,
698                    params,
699                } => {
700                    let mut config =
701                        sink_factory::connector_params_to_config(connector_type, params);
702
703                    // Resolve credentials profile if specified
704                    if let Some(profile_name) = config.properties.swap_remove("profile") {
705                        if let Some(ref store) = self.credentials_store {
706                            store
707                                .merge_profile(&profile_name, &mut config)
708                                .map_err(|e| {
709                                    error::EngineError::Compilation(format!(
710                                        "Failed to resolve credentials profile '{}' for connector '{}': {}",
711                                        profile_name, name, e
712                                    ))
713                                })?;
714                            info!(
715                                "Registered connector: {} (type: {}, profile: {})",
716                                name, connector_type, profile_name
717                            );
718                        } else {
719                            return Err(error::EngineError::Compilation(format!(
720                                "Connector '{}' references profile '{}' but no credentials file was provided. \
721                                 Use --credentials or set VARPULIS_CREDENTIALS.",
722                                name, profile_name
723                            )));
724                        }
725                    } else {
726                        info!("Registered connector: {} (type: {})", name, connector_type);
727                    }
728
729                    self.connectors.insert(name.clone(), config);
730                }
731                #[cfg(not(feature = "async-runtime"))]
732                Stmt::ConnectorDecl { name, .. } => {
733                    warn!(
734                        "Connector '{}' declarations require async-runtime feature; ignoring",
735                        name
736                    );
737                }
738                _ => {
739                    tracing::debug!("Skipping statement: {:?}", stmt.node);
740                }
741            }
742        }
743
744        // Build sinks and connectors (async-runtime only — requires tokio for async I/O)
745        #[cfg(feature = "async-runtime")]
746        {
747            // Collect all sink_keys actually referenced by stream operations
748            let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
749            let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
750            for stream in self.streams.values() {
751                for op in &stream.operations {
752                    if let RuntimeOp::To(to_config) = op {
753                        referenced_sink_keys.insert(to_config.sink_key.clone());
754                        match &to_config.topic {
755                            Some(types::TopicSpec::Static(topic)) => {
756                                topic_overrides.push((
757                                    to_config.sink_key.clone(),
758                                    to_config.connector_name.clone(),
759                                    topic.clone(),
760                                ));
761                            }
762                            Some(types::TopicSpec::Dynamic(_)) => {
763                                // Dynamic topics use the base connector — ensure it's registered
764                                referenced_sink_keys.insert(to_config.connector_name.clone());
765                            }
766                            None => {}
767                        }
768                    }
769                }
770            }
771
772            // Build sinks using the registry
773            self.sinks.build_from_connectors(
774                &self.connectors,
775                &referenced_sink_keys,
776                &topic_overrides,
777                self.context_name.as_deref(),
778                self.topic_prefix.as_deref(),
779            );
780
781            // Wrap sinks with circuit breaker + DLQ when sink operations exist
782            if !self.sinks.cache().is_empty() {
783                let dlq_path = self
784                    .dlq_path
785                    .clone()
786                    .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
787                let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
788                    &dlq_path,
789                    self.dlq_config.clone(),
790                )
791                .map(Arc::new)
792                .ok();
793                if dlq.is_some() {
794                    tracing::info!(
795                        "Dead letter queue enabled at {} for {} sink(s)",
796                        dlq_path.display(),
797                        self.sinks.cache().len()
798                    );
799                }
800                self.dlq = dlq.clone();
801                self.sinks.wrap_with_resilience(
802                    crate::circuit_breaker::CircuitBreakerConfig::default(),
803                    dlq,
804                    self.metrics.clone(),
805                );
806            }
807        }
808
809        // Phase 2: Detect multi-query Hamlet sharing opportunities
810        self.setup_hamlet_sharing();
811
812        // Build physical plan snapshot for introspection
813        let mut plan = physical_plan::PhysicalPlan::new();
814        let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
815        for (event_type, targets) in self.router.all_routes() {
816            for target in targets.iter() {
817                stream_event_types
818                    .entry(target.clone())
819                    .or_default()
820                    .push(event_type.clone());
821            }
822        }
823        for (name, stream_def) in &self.streams {
824            let op_summary = stream_def
825                .operations
826                .iter()
827                .map(|op| op.summary_name())
828                .collect::<Vec<_>>()
829                .join(" → ");
830            plan.add_stream(physical_plan::PhysicalStream {
831                name: name.clone(),
832                operation_count: stream_def.operations.len(),
833                operation_summary: op_summary,
834                logical_id: plan.stream_count() as u32,
835                registered_event_types: stream_event_types
836                    .remove(name.as_str())
837                    .unwrap_or_default(),
838            });
839        }
840        self.physical_plan = Some(plan);
841
842        Ok(())
843    }
844
845    /// Detect overlapping Kleene patterns across streams using `.trend_aggregate()`
846    /// and replace per-stream aggregators with shared ones.
847    fn setup_hamlet_sharing(&mut self) {
848        use crate::hamlet::template::TemplateBuilder;
849        use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
850
851        let hamlet_streams: Vec<String> = self
852            .streams
853            .iter()
854            .filter(|(_, s)| s.hamlet_aggregator.is_some())
855            .map(|(name, _)| name.clone())
856            .collect();
857
858        if hamlet_streams.len() < 2 {
859            return;
860        }
861
862        let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
863
864        for stream_name in &hamlet_streams {
865            if let Some(stream) = self.streams.get(stream_name) {
866                let mut kleene_types = Vec::new();
867                for op in &stream.operations {
868                    if let RuntimeOp::TrendAggregate(_) = op {
869                        if let Some(ref agg) = stream.hamlet_aggregator {
870                            let tmpl = agg.merged_template();
871                            for query in agg.registered_queries() {
872                                for &kt in &query.kleene_types {
873                                    let type_name =
874                                        tmpl.type_name(kt).unwrap_or("unknown").to_string();
875                                    if !kleene_types.contains(&type_name) {
876                                        kleene_types.push(type_name);
877                                    }
878                                }
879                            }
880                        }
881                    }
882                }
883                kleene_types.sort();
884                kleene_groups
885                    .entry(kleene_types)
886                    .or_default()
887                    .push(stream_name.clone());
888            }
889        }
890
891        for (kleene_key, group_streams) in &kleene_groups {
892            if group_streams.len() < 2 || kleene_key.is_empty() {
893                continue;
894            }
895
896            info!(
897                "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
898                group_streams.len(),
899                kleene_key,
900                group_streams,
901            );
902
903            let mut builder = TemplateBuilder::new();
904            type SharingEntry = (
905                String,
906                QueryRegistration,
907                Vec<(String, crate::greta::GretaAggregate)>,
908            );
909            let mut all_registrations: Vec<SharingEntry> = Vec::new();
910            let mut next_query_id: crate::greta::QueryId = 0;
911
912            for stream_name in group_streams {
913                if let Some(stream) = self.streams.get(stream_name) {
914                    if let Some(ref agg) = stream.hamlet_aggregator {
915                        let tmpl = agg.merged_template();
916                        for query in agg.registered_queries() {
917                            let new_id = next_query_id;
918                            next_query_id += 1;
919
920                            // Resolve type indices to actual event type names
921                            let event_names: Vec<String> = query
922                                .event_types
923                                .iter()
924                                .map(|&idx| tmpl.type_name(idx).unwrap_or("unknown").to_string())
925                                .collect();
926                            let name_strs: Vec<&str> =
927                                event_names.iter().map(String::as_str).collect();
928
929                            // Record base state before adding states for this query
930                            let base_state = builder.num_states() as u16;
931                            builder.add_sequence(new_id, &name_strs);
932
933                            for &kt in &query.kleene_types {
934                                let type_name = tmpl.type_name(kt).unwrap_or("unknown").to_string();
935                                let position = event_names
936                                    .iter()
937                                    .position(|n| *n == type_name)
938                                    .unwrap_or(0);
939                                // Self-loop at target state (base + position + 1)
940                                // since add_sequence creates states starting at base_state
941                                let kleene_state = base_state + position as u16 + 1;
942                                builder.add_kleene(new_id, &type_name, kleene_state);
943                            }
944
945                            let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
946                                .operations
947                                .iter()
948                                .find_map(|op| {
949                                    if let RuntimeOp::TrendAggregate(config) = op {
950                                        Some(config.fields.clone())
951                                    } else {
952                                        None
953                                    }
954                                })
955                                .unwrap_or_default();
956
957                            all_registrations.push((
958                                stream_name.clone(),
959                                QueryRegistration {
960                                    id: new_id,
961                                    event_types: query.event_types.clone(),
962                                    kleene_types: query.kleene_types.clone(),
963                                    aggregate: query.aggregate,
964                                },
965                                fields,
966                            ));
967                        }
968                    }
969                }
970            }
971
972            let template = builder.build();
973            let config = HamletConfig {
974                window_ms: 60_000,
975                incremental: true,
976                ..Default::default()
977            };
978            let mut shared_agg = HamletAggregator::new(config, template);
979
980            for (_, registration, _) in &all_registrations {
981                shared_agg.register_query(registration.clone());
982            }
983
984            let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
985            self.shared_hamlet_aggregators.push(shared_ref.clone());
986
987            for (stream_name, registration, fields) in &all_registrations {
988                if let Some(stream) = self.streams.get_mut(stream_name) {
989                    stream.hamlet_aggregator = None;
990                    stream.shared_hamlet_ref = Some(shared_ref.clone());
991
992                    for op in &mut stream.operations {
993                        if let RuntimeOp::TrendAggregate(config) = op {
994                            config.query_id = registration.id;
995                            config.fields = fields.clone();
996                        }
997                    }
998                }
999            }
1000
1001            info!(
1002                "Created shared Hamlet aggregator with {} queries",
1003                next_query_id,
1004            );
1005        }
1006    }
1007
1008    /// Connect all sinks that require explicit connection (async-runtime only).
1009    #[cfg(feature = "async-runtime")]
1010    #[tracing::instrument(skip(self))]
1011    pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
1012        self.sinks
1013            .connect_all()
1014            .await
1015            .map_err(error::EngineError::Pipeline)
1016    }
1017
1018    /// Inject a pre-built sink into the engine's registry (async-runtime only).
1019    #[cfg(feature = "async-runtime")]
1020    pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
1021        self.sinks.insert(key.to_string(), sink);
1022    }
1023
1024    /// Check whether a given key has a registered sink (async-runtime only).
1025    #[cfg(feature = "async-runtime")]
1026    pub fn has_sink(&self, key: &str) -> bool {
1027        self.sinks.cache().contains_key(key)
1028    }
1029
1030    /// Return all sink keys that belong to a given connector name (async-runtime only).
1031    #[cfg(feature = "async-runtime")]
1032    pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
1033        let prefix = format!("{connector_name}::");
1034        self.sinks
1035            .cache()
1036            .keys()
1037            .filter(|k| *k == connector_name || k.starts_with(&prefix))
1038            .cloned()
1039            .collect()
1040    }
1041
1042    // =========================================================================
1043    // Query / Introspection
1044    // =========================================================================
1045
1046    /// Check if any registered stream uses `.to()` or `.enrich()` operations.
1047    pub fn has_sink_operations(&self) -> bool {
1048        self.streams.values().any(|s| {
1049            s.operations
1050                .iter()
1051                .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
1052        })
1053    }
1054
1055    /// Returns (events_in, events_out) counters for this engine.
1056    pub const fn event_counters(&self) -> (u64, u64) {
1057        (self.events_processed, self.output_events_emitted)
1058    }
1059
1060    /// Check if all streams are stateless (safe for round-robin distribution).
1061    pub fn is_stateless(&self) -> bool {
1062        self.streams.values().all(|s| {
1063            s.sase_engine.is_none()
1064                && s.join_buffer.is_none()
1065                && s.hamlet_aggregator.is_none()
1066                && s.shared_hamlet_ref.is_none()
1067                && s.operations.iter().all(|op| {
1068                    matches!(
1069                        op,
1070                        RuntimeOp::WhereExpr(_)
1071                            | RuntimeOp::WhereClosure(_)
1072                            | RuntimeOp::Select(_)
1073                            | RuntimeOp::Emit(_)
1074                            | RuntimeOp::EmitExpr(_)
1075                            | RuntimeOp::Print(_)
1076                            | RuntimeOp::Log(_)
1077                            | RuntimeOp::Having(_)
1078                            | RuntimeOp::Process(_)
1079                            | RuntimeOp::Pattern(_)
1080                            | RuntimeOp::To(_)
1081                    )
1082                })
1083        })
1084    }
1085
1086    /// Return the partition key used by partitioned operations, if any.
1087    pub fn partition_key(&self) -> Option<String> {
1088        for stream in self.streams.values() {
1089            if let Some(ref sase) = stream.sase_engine {
1090                if let Some(key) = sase.partition_by() {
1091                    return Some(key.to_string());
1092                }
1093            }
1094            for op in &stream.operations {
1095                match op {
1096                    RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
1097                    RuntimeOp::PartitionedSlidingCountWindow(pw) => {
1098                        return Some(pw.partition_key.clone())
1099                    }
1100                    RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
1101                    _ => {}
1102                }
1103            }
1104            if stream.sase_engine.is_some() {
1105                for op in &stream.operations {
1106                    if let RuntimeOp::WhereExpr(expr) = op {
1107                        if let Some(key) = compilation::extract_equality_join_key(expr) {
1108                            return Some(key);
1109                        }
1110                    }
1111                }
1112            }
1113        }
1114        None
1115    }
1116
1117    /// Check if any registered stream has session windows.
1118    pub fn has_session_windows(&self) -> bool {
1119        self.streams.values().any(|s| {
1120            s.operations.iter().any(|op| {
1121                matches!(
1122                    op,
1123                    RuntimeOp::Window(WindowType::Session(_))
1124                        | RuntimeOp::Window(WindowType::PartitionedSession(_))
1125                )
1126            })
1127        })
1128    }
1129
1130    /// Return the smallest session gap across all streams (used as sweep interval).
1131    pub fn min_session_gap(&self) -> Option<chrono::Duration> {
1132        let mut min_gap: Option<chrono::Duration> = None;
1133        for stream in self.streams.values() {
1134            for op in &stream.operations {
1135                if let RuntimeOp::Window(window) = op {
1136                    let gap = match window {
1137                        WindowType::Session(w) => Some(w.gap()),
1138                        WindowType::PartitionedSession(w) => Some(w.gap()),
1139                        _ => None,
1140                    };
1141                    if let Some(g) = gap {
1142                        min_gap = Some(match min_gap {
1143                            Some(current) if g < current => g,
1144                            Some(current) => current,
1145                            None => g,
1146                        });
1147                    }
1148                }
1149            }
1150        }
1151        min_gap
1152    }
1153
1154    /// Get metrics
1155    pub fn metrics(&self) -> EngineMetrics {
1156        EngineMetrics {
1157            events_processed: self.events_processed,
1158            output_events_emitted: self.output_events_emitted,
1159            streams_count: self.streams.len(),
1160        }
1161    }
1162
1163    /// Get the names of all loaded streams.
1164    pub fn stream_names(&self) -> Vec<&str> {
1165        self.streams.keys().map(|s| s.as_str()).collect()
1166    }
1167
1168    /// Get the physical plan summary (available after `load()`).
1169    pub fn physical_plan_summary(&self) -> Option<String> {
1170        self.physical_plan.as_ref().map(|p| p.summary())
1171    }
1172
1173    pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
1174        let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
1175        let optimized = varpulis_parser::optimize_plan(logical);
1176        Ok(optimized.explain())
1177    }
1178
1179    /// Build a topology snapshot of the currently loaded streams and routes.
1180    pub fn topology(&self) -> topology::Topology {
1181        let mut builder = topology_builder::TopologyBuilder::new();
1182        for (name, stream_def) in &self.streams {
1183            builder = builder.add_stream(name, stream_def);
1184        }
1185        builder = builder.add_routes(self.router.all_routes());
1186        builder.build()
1187    }
1188
1189    /// Get a user-defined function by name
1190    pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1191        self.functions.get(name)
1192    }
1193
1194    /// Get all registered function names
1195    pub fn function_names(&self) -> Vec<&str> {
1196        self.functions.keys().map(|s| s.as_str()).collect()
1197    }
1198
1199    /// Get all timer configurations for spawning timer tasks
1200    pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1201        let mut timers = Vec::new();
1202        for stream in self.streams.values() {
1203            if let RuntimeSource::Timer(config) = &stream.source {
1204                timers.push((
1205                    config.interval_ns,
1206                    config.initial_delay_ns,
1207                    config.timer_event_type.clone(),
1208                ));
1209            }
1210        }
1211        timers
1212    }
1213
1214    /// Register a native scalar UDF.
1215    pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1216        self.udf_registry.register_scalar(udf);
1217    }
1218
1219    /// Register a native aggregate UDF.
1220    pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1221        self.udf_registry.register_aggregate(udf);
1222    }
1223
1224    /// Get a reference to the UDF registry.
1225    pub const fn udf_registry(&self) -> &UdfRegistry {
1226        &self.udf_registry
1227    }
1228
1229    // =========================================================================
1230    // Hot Reload
1231    // =========================================================================
1232
1233    /// Reload program without losing state where possible (async-runtime only).
1234    #[cfg(feature = "async-runtime")]
1235    pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1236        let mut report = ReloadReport::default();
1237
1238        let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1239
1240        let mut new_engine = Self::new_internal(self.clone_output_channel());
1241        new_engine.credentials_store = self.credentials_store.clone();
1242        new_engine.load(program)?;
1243
1244        let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1245
1246        for name in new_streams.difference(&old_streams) {
1247            report.streams_added.push(name.clone());
1248        }
1249
1250        for name in old_streams.difference(&new_streams) {
1251            report.streams_removed.push(name.clone());
1252        }
1253
1254        for name in old_streams.intersection(&new_streams) {
1255            let old_stream = self.streams.get(name).unwrap();
1256            let new_stream = new_engine.streams.get(name).unwrap();
1257
1258            let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1259            let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1260
1261            if source_changed || ops_changed {
1262                report.streams_updated.push(name.clone());
1263                report.state_reset.push(name.clone());
1264            } else {
1265                report.state_preserved.push(name.clone());
1266            }
1267        }
1268
1269        for name in &report.streams_removed {
1270            self.streams.remove(name);
1271        }
1272
1273        self.router.clear();
1274
1275        for name in &report.streams_added {
1276            if let Some(stream) = new_engine.streams.remove(name) {
1277                self.streams.insert(name.clone(), stream);
1278            }
1279        }
1280
1281        for name in &report.streams_updated {
1282            if let Some(stream) = new_engine.streams.remove(name) {
1283                self.streams.insert(name.clone(), stream);
1284            }
1285        }
1286
1287        let registrations: Vec<(String, String)> = self
1288            .streams
1289            .iter()
1290            .flat_map(|(name, stream)| {
1291                let mut pairs = Vec::new();
1292                match &stream.source {
1293                    RuntimeSource::EventType(et) => {
1294                        pairs.push((et.clone(), name.clone()));
1295                    }
1296                    RuntimeSource::Stream(s) => {
1297                        pairs.push((s.clone(), name.clone()));
1298                    }
1299                    RuntimeSource::Merge(sources) => {
1300                        for ms in sources {
1301                            pairs.push((ms.event_type.clone(), name.clone()));
1302                        }
1303                    }
1304                    RuntimeSource::Join(_) => {}
1305                    RuntimeSource::Timer(config) => {
1306                        pairs.push((config.timer_event_type.clone(), name.clone()));
1307                    }
1308                }
1309                pairs
1310            })
1311            .collect();
1312
1313        for (event_type, stream_name) in registrations {
1314            self.router.add_route(&event_type, &stream_name);
1315        }
1316
1317        self.functions = new_engine.functions;
1318        self.patterns = new_engine.patterns;
1319        self.configs = new_engine.configs;
1320        self.context_map = new_engine.context_map;
1321        self.connectors = new_engine.connectors;
1322        self.source_bindings = new_engine.source_bindings;
1323        *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1324
1325        for (name, value) in new_engine.variables {
1326            if !self.variables.contains_key(&name) {
1327                self.variables.insert(name.clone(), value);
1328                self.mutable_vars
1329                    .extend(new_engine.mutable_vars.iter().cloned());
1330            }
1331        }
1332
1333        info!(
1334            "Hot reload complete: +{} -{} ~{} streams",
1335            report.streams_added.len(),
1336            report.streams_removed.len(),
1337            report.streams_updated.len()
1338        );
1339
1340        Ok(report)
1341    }
1342
1343    // =========================================================================
1344    // Checkpointing
1345    // =========================================================================
1346
1347    /// Create a checkpoint of the engine state (windows, SASE engines, joins, variables).
1348    pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1349        use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1350
1351        let mut window_states = std::collections::HashMap::new();
1352        let mut sase_states = std::collections::HashMap::new();
1353        let mut join_states = std::collections::HashMap::new();
1354        let mut distinct_states = std::collections::HashMap::new();
1355        let mut limit_states = std::collections::HashMap::new();
1356
1357        for (name, stream) in &self.streams {
1358            for op in &stream.operations {
1359                match op {
1360                    RuntimeOp::Window(wt) => {
1361                        let cp = match wt {
1362                            WindowType::Tumbling(w) => w.checkpoint(),
1363                            WindowType::Sliding(w) => w.checkpoint(),
1364                            WindowType::Count(w) => w.checkpoint(),
1365                            WindowType::SlidingCount(w) => w.checkpoint(),
1366                            WindowType::Session(w) => w.checkpoint(),
1367                            WindowType::PartitionedSession(w) => w.checkpoint(),
1368                            WindowType::PartitionedTumbling(w) => w.checkpoint(),
1369                            WindowType::PartitionedSliding(w) => w.checkpoint(),
1370                        };
1371                        window_states.insert(name.clone(), cp);
1372                    }
1373                    RuntimeOp::PartitionedWindow(pw) => {
1374                        let mut partitions = std::collections::HashMap::new();
1375                        for (key, cw) in &pw.windows {
1376                            let sub_cp = cw.checkpoint();
1377                            partitions.insert(
1378                                key.clone(),
1379                                crate::persistence::PartitionedWindowCheckpoint {
1380                                    events: sub_cp.events,
1381                                    window_start_ms: sub_cp.window_start_ms,
1382                                },
1383                            );
1384                        }
1385                        window_states.insert(
1386                            name.clone(),
1387                            WindowCheckpoint {
1388                                events: Vec::new(),
1389                                window_start_ms: None,
1390                                last_emit_ms: None,
1391                                partitions,
1392                            },
1393                        );
1394                    }
1395                    RuntimeOp::Distinct(state) => {
1396                        let keys: Vec<String> =
1397                            state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1398                        distinct_states.insert(
1399                            name.clone(),
1400                            crate::persistence::DistinctCheckpoint { keys },
1401                        );
1402                    }
1403                    RuntimeOp::Limit(state) => {
1404                        limit_states.insert(
1405                            name.clone(),
1406                            crate::persistence::LimitCheckpoint {
1407                                max: state.max,
1408                                count: state.count,
1409                            },
1410                        );
1411                    }
1412                    _ => {}
1413                }
1414            }
1415
1416            if let Some(ref sase) = stream.sase_engine {
1417                sase_states.insert(name.clone(), sase.checkpoint());
1418            }
1419
1420            if let Some(ref jb) = stream.join_buffer {
1421                join_states.insert(name.clone(), jb.checkpoint());
1422            }
1423        }
1424
1425        let variables = self
1426            .variables
1427            .iter()
1428            .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1429            .collect();
1430
1431        let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1432
1433        EngineCheckpoint {
1434            version: crate::persistence::CHECKPOINT_VERSION,
1435            window_states,
1436            sase_states,
1437            join_states,
1438            variables,
1439            events_processed: self.events_processed,
1440            output_events_emitted: self.output_events_emitted,
1441            watermark_state,
1442            distinct_states,
1443            limit_states,
1444        }
1445    }
1446
1447    /// Restore engine state from a checkpoint.
1448    pub fn restore_checkpoint(
1449        &mut self,
1450        cp: &crate::persistence::EngineCheckpoint,
1451    ) -> Result<(), crate::persistence::StoreError> {
1452        let mut migrated = cp.clone();
1453        migrated.validate_and_migrate()?;
1454
1455        self.events_processed = cp.events_processed;
1456        self.output_events_emitted = cp.output_events_emitted;
1457
1458        for (k, sv) in &cp.variables {
1459            self.variables
1460                .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1461        }
1462
1463        for (name, stream) in &mut self.streams {
1464            if let Some(wcp) = cp.window_states.get(name) {
1465                for op in &mut stream.operations {
1466                    match op {
1467                        RuntimeOp::Window(wt) => match wt {
1468                            WindowType::Tumbling(w) => w.restore(wcp),
1469                            WindowType::Sliding(w) => w.restore(wcp),
1470                            WindowType::Count(w) => w.restore(wcp),
1471                            WindowType::SlidingCount(w) => w.restore(wcp),
1472                            WindowType::Session(w) => w.restore(wcp),
1473                            WindowType::PartitionedSession(w) => w.restore(wcp),
1474                            WindowType::PartitionedTumbling(w) => w.restore(wcp),
1475                            WindowType::PartitionedSliding(w) => w.restore(wcp),
1476                        },
1477                        RuntimeOp::PartitionedWindow(pw) => {
1478                            for (key, pcp) in &wcp.partitions {
1479                                let sub_wcp = crate::persistence::WindowCheckpoint {
1480                                    events: pcp.events.clone(),
1481                                    window_start_ms: pcp.window_start_ms,
1482                                    last_emit_ms: None,
1483                                    partitions: std::collections::HashMap::new(),
1484                                };
1485                                let window = pw
1486                                    .windows
1487                                    .entry(key.clone())
1488                                    .or_insert_with(|| CountWindow::new(pw.window_size));
1489                                window.restore(&sub_wcp);
1490                            }
1491                        }
1492                        _ => {}
1493                    }
1494                }
1495            }
1496
1497            if let Some(scp) = cp.sase_states.get(name) {
1498                if let Some(ref mut sase) = stream.sase_engine {
1499                    sase.restore(scp);
1500                }
1501            }
1502
1503            if let Some(jcp) = cp.join_states.get(name) {
1504                if let Some(ref mut jb) = stream.join_buffer {
1505                    jb.restore(jcp);
1506                }
1507            }
1508
1509            if let Some(dcp) = cp.distinct_states.get(name) {
1510                for op in &mut stream.operations {
1511                    if let RuntimeOp::Distinct(state) = op {
1512                        state.seen.clear();
1513                        for key in dcp.keys.iter().rev() {
1514                            state.seen.insert(key.clone(), ());
1515                        }
1516                    }
1517                }
1518            }
1519
1520            if let Some(lcp) = cp.limit_states.get(name) {
1521                for op in &mut stream.operations {
1522                    if let RuntimeOp::Limit(state) = op {
1523                        state.max = lcp.max;
1524                        state.count = lcp.count;
1525                    }
1526                }
1527            }
1528        }
1529
1530        if let Some(ref wcp) = cp.watermark_state {
1531            if self.watermark_tracker.is_none() {
1532                self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1533            }
1534            if let Some(ref mut tracker) = self.watermark_tracker {
1535                tracker.restore(wcp);
1536                self.last_applied_watermark = wcp
1537                    .effective_watermark_ms
1538                    .and_then(DateTime::from_timestamp_millis);
1539            }
1540        }
1541
1542        info!(
1543            "Engine restored: {} events processed, {} streams with state (schema v{})",
1544            cp.events_processed,
1545            cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1546            cp.version
1547        );
1548
1549        Ok(())
1550    }
1551
1552    /// Enable auto-checkpointing with the given store and config.
1553    pub fn enable_checkpointing(
1554        &mut self,
1555        store: std::sync::Arc<dyn crate::persistence::StateStore>,
1556        config: crate::persistence::CheckpointConfig,
1557    ) -> Result<(), crate::persistence::StoreError> {
1558        let manager = crate::persistence::CheckpointManager::new(store, config)?;
1559
1560        if let Some(cp) = manager.recover()? {
1561            if let Some(engine_cp) = cp.context_states.get("main") {
1562                self.restore_checkpoint(engine_cp)?;
1563                info!(
1564                    "Auto-restored from checkpoint {} ({} events)",
1565                    cp.id, cp.events_processed
1566                );
1567            }
1568        }
1569
1570        self.checkpoint_manager = Some(manager);
1571        Ok(())
1572    }
1573
1574    /// Check if a checkpoint is due and create one if needed.
1575    pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1576        let should = self
1577            .checkpoint_manager
1578            .as_ref()
1579            .is_some_and(|m| m.should_checkpoint());
1580
1581        if should {
1582            self.force_checkpoint()?;
1583        }
1584        Ok(())
1585    }
1586
1587    /// Force an immediate checkpoint regardless of the interval.
1588    pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1589        if self.checkpoint_manager.is_none() {
1590            return Ok(());
1591        }
1592
1593        let engine_cp = self.create_checkpoint();
1594        let events_processed = self.events_processed;
1595
1596        let mut context_states = std::collections::HashMap::new();
1597        context_states.insert("main".to_string(), engine_cp);
1598
1599        let cp = crate::persistence::Checkpoint {
1600            id: 0,
1601            timestamp_ms: 0,
1602            events_processed,
1603            window_states: std::collections::HashMap::new(),
1604            pattern_states: std::collections::HashMap::new(),
1605            metadata: std::collections::HashMap::new(),
1606            context_states,
1607        };
1608
1609        self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1610        Ok(())
1611    }
1612
1613    /// Returns true if auto-checkpointing is enabled.
1614    pub const fn has_checkpointing(&self) -> bool {
1615        self.checkpoint_manager.is_some()
1616    }
1617
1618    // =========================================================================
1619    // Watermark Management
1620    // =========================================================================
1621
1622    /// Enable per-source watermark tracking for this engine.
1623    pub fn enable_watermark_tracking(&mut self) {
1624        if self.watermark_tracker.is_none() {
1625            self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1626        }
1627    }
1628
1629    /// Register a source for watermark tracking with its max out-of-orderness.
1630    pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1631        if let Some(ref mut tracker) = self.watermark_tracker {
1632            tracker.register_source(source, max_ooo);
1633        }
1634    }
1635
1636    /// Advance the watermark from an external source (async-runtime only).
1637    #[cfg(feature = "async-runtime")]
1638    #[tracing::instrument(skip(self))]
1639    pub async fn advance_external_watermark(
1640        &mut self,
1641        source_context: &str,
1642        watermark_ms: i64,
1643    ) -> Result<(), error::EngineError> {
1644        if let Some(ref mut tracker) = self.watermark_tracker {
1645            if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1646                tracker.advance_source_watermark(source_context, wm);
1647
1648                if let Some(new_wm) = tracker.effective_watermark() {
1649                    if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1650                        self.apply_watermark_to_windows(new_wm).await?;
1651                        self.last_applied_watermark = Some(new_wm);
1652                    }
1653                }
1654            }
1655        }
1656        Ok(())
1657    }
1658
1659    /// Check if two runtime sources are compatible for state preservation (used by reload).
1660    #[cfg(feature = "async-runtime")]
1661    fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1662        match (a, b) {
1663            (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1664            (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1665            (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1666                a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1667            }
1668            (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1669                a.len() == b.len()
1670                    && a.iter()
1671                        .zip(b.iter())
1672                        .all(|(x, y)| x.event_type == y.event_type)
1673            }
1674            (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1675            _ => false,
1676        }
1677    }
1678}