Skip to main content

varpulis_runtime/engine/
mod.rs

1//! Main execution engine for Varpulis
2//!
3//! This module provides the core engine that processes events and executes
4//! stream definitions written in VPL.
5
6mod builder;
7mod compilation;
8pub mod compiler;
9mod dispatch;
10pub mod error;
11pub mod evaluator;
12mod pattern_analyzer;
13pub mod physical_plan;
14mod pipeline;
15pub mod planner;
16mod router;
17mod sink_factory;
18pub mod topology;
19pub mod topology_builder;
20mod types;
21
22// Re-export public types
23pub use builder::EngineBuilder;
24pub use sink_factory::SinkConnectorAdapter;
25pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
26
27// Re-export evaluator for use by other modules (e.g., SASE+)
28pub use evaluator::eval_filter_expr;
29
30// Re-export internal types for use within the engine module
31use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
32
33use crate::connector;
34use crate::context::ContextMap;
35use crate::event::{Event, SharedEvent};
36use crate::metrics::Metrics;
37use crate::sase_persistence::SaseCheckpointExt;
38use crate::sequence::SequenceContext;
39use crate::udf::UdfRegistry;
40use crate::watermark::PerSourceWatermarkTracker;
41use crate::window::CountWindow;
42use chrono::Duration;
43use chrono::{DateTime, Utc};
44use rustc_hash::{FxHashMap, FxHashSet};
45use std::sync::Arc;
46use tokio::sync::mpsc;
47use tracing::{info, warn};
48use varpulis_core::ast::{ConfigItem, Program, Stmt};
49use varpulis_core::Value;
50
51// Re-export NamedPattern from types
52pub use types::NamedPattern;
53
54/// Output channel type enumeration for zero-copy or owned event sending
55#[derive(Debug)]
56pub(super) enum OutputChannel {
57    /// Legacy channel that requires cloning (for backwards compatibility)
58    Owned(mpsc::Sender<Event>),
59    /// Zero-copy channel using SharedEvent (Arc<Event>)
60    Shared(mpsc::Sender<SharedEvent>),
61}
62
63/// The main Varpulis engine
64pub struct Engine {
65    /// Registered stream definitions
66    pub(super) streams: FxHashMap<String, StreamDefinition>,
67    /// Event routing: maps event types to stream names
68    pub(super) router: router::EventRouter,
69    /// User-defined functions
70    pub(super) functions: FxHashMap<String, UserFunction>,
71    /// Named patterns for reuse
72    pub(super) patterns: FxHashMap<String, NamedPattern>,
73    /// Configuration blocks (e.g., mqtt, kafka)
74    pub(super) configs: FxHashMap<String, EngineConfig>,
75    /// Mutable variables accessible across events
76    pub(super) variables: FxHashMap<String, Value>,
77    /// Tracks which variables are declared as mutable (var vs let)
78    pub(super) mutable_vars: FxHashSet<String>,
79    /// Declared connectors from VPL
80    pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
81    /// Source connector bindings from .from() declarations
82    pub(super) source_bindings: Vec<SourceBinding>,
83    /// Sink registry for .to() operations
84    pub(super) sinks: sink_factory::SinkRegistry,
85    /// Output event sender (None for benchmark/quiet mode - skips cloning overhead)
86    pub(super) output_channel: Option<OutputChannel>,
87    /// Metrics
88    pub(super) events_processed: u64,
89    pub(super) output_events_emitted: u64,
90    /// Prometheus metrics
91    pub(super) metrics: Option<Metrics>,
92    /// Context assignments for multi-threaded execution
93    pub(super) context_map: ContextMap,
94    /// Per-source watermark tracker for event-time processing
95    pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
96    /// Last applied watermark (for detecting advances)
97    pub(super) last_applied_watermark: Option<DateTime<Utc>>,
98    /// Late data configurations per stream
99    pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
100    /// Context name when running inside a context thread (used for unique connector IDs)
101    pub(super) context_name: Option<String>,
102    /// Shared Hamlet aggregators for multi-query optimization
103    pub(super) shared_hamlet_aggregators:
104        Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
105    /// Auto-checkpointing manager (None = checkpointing disabled)
106    pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
107    /// Custom DLQ file path (defaults to "varpulis-dlq.jsonl")
108    pub(super) dlq_path: Option<std::path::PathBuf>,
109    /// DLQ configuration (rotation limits etc.)
110    pub(super) dlq_config: crate::dead_letter::DlqConfig,
111    /// Shared DLQ instance (created during load())
112    pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
113    /// Physical plan snapshot for introspection (built during load_program)
114    pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
115    /// Registry for native Rust UDFs (scalar + aggregate)
116    pub(super) udf_registry: UdfRegistry,
117}
118
119impl std::fmt::Debug for Engine {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        f.debug_struct("Engine")
122            .field("streams", &self.streams.keys().collect::<Vec<_>>())
123            .field("functions", &self.functions.keys().collect::<Vec<_>>())
124            .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
125            .field("configs", &self.configs.keys().collect::<Vec<_>>())
126            .field("connectors", &self.connectors.keys().collect::<Vec<_>>())
127            .field("events_processed", &self.events_processed)
128            .field("output_events_emitted", &self.output_events_emitted)
129            .field("context_map", &self.context_map)
130            .field("context_name", &self.context_name)
131            .finish_non_exhaustive()
132    }
133}
134
135impl Engine {
136    /// Create an [`EngineBuilder`] for fluent engine construction.
137    ///
138    /// # Examples
139    ///
140    /// ```rust,no_run
141    /// use varpulis_runtime::Engine;
142    /// use tokio::sync::mpsc;
143    ///
144    /// let (tx, _rx) = mpsc::channel(100);
145    /// let mut engine = Engine::builder()
146    ///     .output(tx)
147    ///     .build();
148    /// ```
149    pub fn builder() -> EngineBuilder {
150        EngineBuilder::new()
151    }
152
153    pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
154        Self {
155            streams: FxHashMap::default(),
156            router: router::EventRouter::new(),
157            functions: FxHashMap::default(),
158            patterns: FxHashMap::default(),
159            configs: FxHashMap::default(),
160            variables: FxHashMap::default(),
161            mutable_vars: FxHashSet::default(),
162            connectors: FxHashMap::default(),
163            source_bindings: Vec::new(),
164            sinks: sink_factory::SinkRegistry::new(),
165            output_channel: Some(OutputChannel::Owned(output_tx)),
166            events_processed: 0,
167            output_events_emitted: 0,
168            metrics: None,
169            context_map: ContextMap::new(),
170            watermark_tracker: None,
171            last_applied_watermark: None,
172            late_data_configs: FxHashMap::default(),
173            context_name: None,
174            shared_hamlet_aggregators: Vec::new(),
175            checkpoint_manager: None,
176            dlq_path: None,
177            dlq_config: crate::dead_letter::DlqConfig::default(),
178            dlq: None,
179            physical_plan: None,
180            udf_registry: UdfRegistry::new(),
181        }
182    }
183
184    /// Create engine without output channel (for benchmarking - skips Event cloning overhead)
185    pub fn new_benchmark() -> Self {
186        Self::new_internal(None)
187    }
188
189    /// Create engine with optional output channel (legacy API, requires cloning)
190    pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
191        Self::new_internal(output_tx.map(OutputChannel::Owned))
192    }
193
194    /// Create engine with zero-copy SharedEvent output channel (PERF: avoids cloning)
195    pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
196        Self::new_internal(Some(OutputChannel::Shared(output_tx)))
197    }
198
199    /// Internal constructor
200    fn new_internal(output_channel: Option<OutputChannel>) -> Self {
201        Self {
202            streams: FxHashMap::default(),
203            router: router::EventRouter::new(),
204            functions: FxHashMap::default(),
205            patterns: FxHashMap::default(),
206            configs: FxHashMap::default(),
207            variables: FxHashMap::default(),
208            mutable_vars: FxHashSet::default(),
209            connectors: FxHashMap::default(),
210            source_bindings: Vec::new(),
211            sinks: sink_factory::SinkRegistry::new(),
212            output_channel,
213            events_processed: 0,
214            output_events_emitted: 0,
215            metrics: None,
216            context_map: ContextMap::new(),
217            watermark_tracker: None,
218            last_applied_watermark: None,
219            late_data_configs: FxHashMap::default(),
220            context_name: None,
221            shared_hamlet_aggregators: Vec::new(),
222            checkpoint_manager: None,
223            dlq_path: None,
224            dlq_config: crate::dead_letter::DlqConfig::default(),
225            dlq: None,
226            physical_plan: None,
227            udf_registry: UdfRegistry::new(),
228        }
229    }
230
231    /// Clone the output channel for use in engine reload
232    fn clone_output_channel(&self) -> Option<OutputChannel> {
233        match &self.output_channel {
234            Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
235            Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
236            None => None,
237        }
238    }
239
240    /// Send an output event to the output channel (if configured).
241    /// In benchmark mode (no output channel), this is a no-op to avoid cloning overhead.
242    /// PERF: Uses zero-copy for SharedEvent channels, clones only for legacy Owned channels.
243    #[inline]
244    pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
245        match &self.output_channel {
246            Some(OutputChannel::Shared(tx)) => {
247                // PERF: Zero-copy - just increment Arc refcount
248                if let Err(e) = tx.try_send(Arc::clone(event)) {
249                    warn!("Failed to send output event: {}", e);
250                }
251            }
252            Some(OutputChannel::Owned(tx)) => {
253                // Legacy path: must clone the Event
254                let owned = (**event).clone();
255                if let Err(e) = tx.try_send(owned) {
256                    warn!("Failed to send output event: {}", e);
257                }
258            }
259            None => {
260                // Benchmark mode: skip sending entirely - no clone!
261            }
262        }
263    }
264
265    /// Send an output event to the output channel (if configured).
266    /// In benchmark mode (no output channel), this is a no-op to avoid cloning overhead.
267    #[inline]
268    pub(super) fn send_output(&mut self, event: Event) {
269        match &self.output_channel {
270            Some(OutputChannel::Shared(tx)) => {
271                // Wrap in Arc for shared channel
272                if let Err(e) = tx.try_send(Arc::new(event)) {
273                    warn!("Failed to send output event: {}", e);
274                }
275            }
276            Some(OutputChannel::Owned(tx)) => {
277                if let Err(e) = tx.try_send(event) {
278                    warn!("Failed to send output event: {}", e);
279                }
280            }
281            None => {
282                // Benchmark mode: skip sending entirely
283            }
284        }
285    }
286
287    /// Set the context name for this engine instance.
288    pub fn set_context_name(&mut self, name: &str) {
289        self.context_name = Some(name.to_string());
290    }
291
292    /// Set a custom DLQ file path (call before `load()`).
293    pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
294        self.dlq_path = Some(path);
295    }
296
297    /// Set custom DLQ configuration (call before `load()`).
298    pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
299        self.dlq_config = config;
300    }
301
302    /// Access the shared DLQ instance (created during `load()`).
303    pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
304        self.dlq.as_ref()
305    }
306
307    /// Get a named pattern by name
308    pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
309        self.patterns.get(name)
310    }
311
312    /// Get all registered patterns
313    pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
314        &self.patterns
315    }
316
317    /// Get a configuration block by name
318    pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
319        self.configs.get(name)
320    }
321
322    /// Get a declared connector by name
323    pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
324        self.connectors.get(name)
325    }
326
327    /// Get all declared connector configs (for building a ManagedConnectorRegistry).
328    pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
329        &self.connectors
330    }
331
332    /// Get source connector bindings from .from() declarations
333    pub fn source_bindings(&self) -> &[SourceBinding] {
334        &self.source_bindings
335    }
336
337    /// Get a variable value by name
338    pub fn get_variable(&self, name: &str) -> Option<&Value> {
339        self.variables.get(name)
340    }
341
342    /// Set a variable value (must be mutable or new)
343    pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
344        if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
345            return Err(error::EngineError::Compilation(format!(
346                "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
347            )));
348        }
349        self.variables.insert(name.to_string(), value);
350        Ok(())
351    }
352
353    /// Get all variables (for debugging/testing)
354    pub const fn variables(&self) -> &FxHashMap<String, Value> {
355        &self.variables
356    }
357
358    /// Get the context map (for orchestrator setup)
359    pub const fn context_map(&self) -> &ContextMap {
360        &self.context_map
361    }
362
363    /// Check if the loaded program declares any contexts
364    pub fn has_contexts(&self) -> bool {
365        self.context_map.has_contexts()
366    }
367
368    /// Enable Prometheus metrics
369    pub fn with_metrics(mut self, metrics: Metrics) -> Self {
370        self.metrics = Some(metrics);
371        self
372    }
373
374    /// Add a programmatic filter to a stream using a closure
375    pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
376    where
377        F: Fn(&Event) -> bool + Send + Sync + 'static,
378    {
379        if let Some(stream) = self.streams.get_mut(stream_name) {
380            let wrapped = move |e: &SharedEvent| filter(e.as_ref());
381            stream
382                .operations
383                .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
384            Ok(())
385        } else {
386            Err(error::EngineError::StreamNotFound(stream_name.to_string()))
387        }
388    }
389
390    /// Load a program into the engine with semantic validation.
391    pub fn load_with_source(
392        &mut self,
393        source: &str,
394        program: &Program,
395    ) -> Result<(), error::EngineError> {
396        let validation = varpulis_core::validate::validate(source, program);
397        if validation.has_errors() {
398            return Err(error::EngineError::Compilation(validation.format(source)));
399        }
400        for warning in validation
401            .diagnostics
402            .iter()
403            .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
404        {
405            warn!("{}", warning.message);
406        }
407        self.load_program(program)
408    }
409
410    /// Load a program into the engine (no semantic validation).
411    pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
412        self.load_program(program)
413    }
414
415    fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
416        for stmt in &program.statements {
417            match &stmt.node {
418                Stmt::StreamDecl {
419                    name, source, ops, ..
420                } => {
421                    self.register_stream(name, source, ops)?;
422                }
423                Stmt::EventDecl { name, fields, .. } => {
424                    info!(
425                        "Registered event type: {} with {} fields",
426                        name,
427                        fields.len()
428                    );
429                }
430                Stmt::FnDecl {
431                    name,
432                    params,
433                    ret,
434                    body,
435                } => {
436                    let user_fn = UserFunction {
437                        name: name.clone(),
438                        params: params
439                            .iter()
440                            .map(|p| (p.name.clone(), p.ty.clone()))
441                            .collect(),
442                        return_type: ret.clone(),
443                        body: body.clone(),
444                    };
445                    info!(
446                        "Registered function: {}({} params)",
447                        name,
448                        user_fn.params.len()
449                    );
450                    self.functions.insert(name.clone(), user_fn);
451                }
452                Stmt::Config { name, items } => {
453                    warn!(
454                        "DEPRECATED: 'config {}' block syntax is deprecated. \
455                         Use 'connector' declarations instead: \
456                         connector MyConn = {} (...)",
457                        name, name
458                    );
459                    let mut values = std::collections::HashMap::new();
460                    for item in items {
461                        if let ConfigItem::Value(key, val) = item {
462                            values.insert(key.clone(), val.clone());
463                        }
464                    }
465                    info!(
466                        "Registered config block: {} with {} items",
467                        name,
468                        values.len()
469                    );
470                    self.configs.insert(
471                        name.clone(),
472                        EngineConfig {
473                            name: name.clone(),
474                            values,
475                        },
476                    );
477                }
478                Stmt::PatternDecl {
479                    name,
480                    expr,
481                    within,
482                    partition_by,
483                } => {
484                    let named_pattern = NamedPattern {
485                        name: name.clone(),
486                        expr: expr.clone(),
487                        within: within.clone(),
488                        partition_by: partition_by.clone(),
489                    };
490                    info!(
491                        "Registered SASE+ pattern: {} (within: {}, partition: {})",
492                        name,
493                        within.is_some(),
494                        partition_by.is_some()
495                    );
496                    self.patterns.insert(name.clone(), named_pattern);
497                }
498                Stmt::Import { path, alias } => {
499                    warn!(
500                        "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
501                        path, alias
502                    );
503                }
504                Stmt::VarDecl {
505                    mutable,
506                    name,
507                    value,
508                    ..
509                } => {
510                    let dummy_event = Event::new("__init__");
511                    let empty_ctx = SequenceContext::new();
512                    let initial_value = evaluator::eval_expr_with_functions(
513                        value,
514                        &dummy_event,
515                        &empty_ctx,
516                        &self.functions,
517                        &self.variables,
518                    )
519                    .ok_or_else(|| {
520                        error::EngineError::Compilation(format!(
521                            "Failed to evaluate initial value for variable '{name}'"
522                        ))
523                    })?;
524
525                    info!(
526                        "Registered {} variable: {} = {:?}",
527                        if *mutable { "mutable" } else { "immutable" },
528                        name,
529                        initial_value
530                    );
531
532                    self.variables.insert(name.clone(), initial_value);
533                    if *mutable {
534                        self.mutable_vars.insert(name.clone());
535                    }
536                }
537                Stmt::Assignment { name, value } => {
538                    let dummy_event = Event::new("__assign__");
539                    let empty_ctx = SequenceContext::new();
540                    let new_value = evaluator::eval_expr_with_functions(
541                        value,
542                        &dummy_event,
543                        &empty_ctx,
544                        &self.functions,
545                        &self.variables,
546                    )
547                    .ok_or_else(|| {
548                        error::EngineError::Compilation(format!(
549                            "Failed to evaluate assignment value for '{name}'"
550                        ))
551                    })?;
552
553                    if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
554                        return Err(error::EngineError::Compilation(format!(
555                            "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
556                        )));
557                    }
558
559                    if !self.variables.contains_key(name) {
560                        self.mutable_vars.insert(name.clone());
561                    }
562
563                    info!("Assigned variable: {} = {:?}", name, new_value);
564                    self.variables.insert(name.clone(), new_value);
565                }
566                Stmt::ContextDecl { name, cores } => {
567                    use crate::context::ContextConfig;
568                    info!("Registered context: {} (cores: {:?})", name, cores);
569                    self.context_map.register_context(ContextConfig {
570                        name: name.clone(),
571                        cores: cores.clone(),
572                    });
573                }
574                Stmt::ConnectorDecl {
575                    name,
576                    connector_type,
577                    params,
578                } => {
579                    let config = sink_factory::connector_params_to_config(connector_type, params);
580                    info!("Registered connector: {} (type: {})", name, connector_type);
581                    self.connectors.insert(name.clone(), config);
582                }
583                _ => {
584                    tracing::debug!("Skipping statement: {:?}", stmt.node);
585                }
586            }
587        }
588
589        // Collect all sink_keys actually referenced by stream operations
590        let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
591        let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
592        for stream in self.streams.values() {
593            for op in &stream.operations {
594                if let RuntimeOp::To(to_config) = op {
595                    referenced_sink_keys.insert(to_config.sink_key.clone());
596                    if let Some(ref topic) = to_config.topic_override {
597                        topic_overrides.push((
598                            to_config.sink_key.clone(),
599                            to_config.connector_name.clone(),
600                            topic.clone(),
601                        ));
602                    }
603                }
604            }
605        }
606
607        // Build sinks using the registry
608        self.sinks.build_from_connectors(
609            &self.connectors,
610            &referenced_sink_keys,
611            &topic_overrides,
612            self.context_name.as_deref(),
613        );
614
615        // Wrap sinks with circuit breaker + DLQ when sink operations exist
616        if !self.sinks.cache().is_empty() {
617            let dlq_path = self
618                .dlq_path
619                .clone()
620                .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
621            let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
622                &dlq_path,
623                self.dlq_config.clone(),
624            )
625            .map(Arc::new)
626            .ok();
627            if dlq.is_some() {
628                tracing::info!(
629                    "Dead letter queue enabled at {} for {} sink(s)",
630                    dlq_path.display(),
631                    self.sinks.cache().len()
632                );
633            }
634            self.dlq = dlq.clone();
635            self.sinks.wrap_with_resilience(
636                crate::circuit_breaker::CircuitBreakerConfig::default(),
637                dlq,
638                self.metrics.clone(),
639            );
640        }
641
642        // Phase 2: Detect multi-query Hamlet sharing opportunities
643        self.setup_hamlet_sharing();
644
645        // Build physical plan snapshot for introspection
646        let mut plan = physical_plan::PhysicalPlan::new();
647        let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
648        for (event_type, targets) in self.router.all_routes() {
649            for target in targets.iter() {
650                stream_event_types
651                    .entry(target.clone())
652                    .or_default()
653                    .push(event_type.clone());
654            }
655        }
656        for (name, stream_def) in &self.streams {
657            let op_summary = stream_def
658                .operations
659                .iter()
660                .map(|op| op.summary_name())
661                .collect::<Vec<_>>()
662                .join(" → ");
663            plan.add_stream(physical_plan::PhysicalStream {
664                name: name.clone(),
665                operation_count: stream_def.operations.len(),
666                operation_summary: op_summary,
667                logical_id: plan.stream_count() as u32,
668                registered_event_types: stream_event_types
669                    .remove(name.as_str())
670                    .unwrap_or_default(),
671            });
672        }
673        self.physical_plan = Some(plan);
674
675        Ok(())
676    }
677
678    /// Detect overlapping Kleene patterns across streams using `.trend_aggregate()`
679    /// and replace per-stream aggregators with shared ones.
680    fn setup_hamlet_sharing(&mut self) {
681        use crate::hamlet::template::TemplateBuilder;
682        use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
683
684        let hamlet_streams: Vec<String> = self
685            .streams
686            .iter()
687            .filter(|(_, s)| s.hamlet_aggregator.is_some())
688            .map(|(name, _)| name.clone())
689            .collect();
690
691        if hamlet_streams.len() < 2 {
692            return;
693        }
694
695        let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
696
697        for stream_name in &hamlet_streams {
698            if let Some(stream) = self.streams.get(stream_name) {
699                let mut kleene_types = Vec::new();
700                for op in &stream.operations {
701                    if let RuntimeOp::TrendAggregate(_) = op {
702                        if let Some(ref agg) = stream.hamlet_aggregator {
703                            for query in agg.registered_queries() {
704                                for &kt in &query.kleene_types {
705                                    let type_name = format!("type_{kt}");
706                                    if !kleene_types.contains(&type_name) {
707                                        kleene_types.push(type_name);
708                                    }
709                                }
710                            }
711                        }
712                    }
713                }
714                kleene_types.sort();
715                kleene_groups
716                    .entry(kleene_types)
717                    .or_default()
718                    .push(stream_name.clone());
719            }
720        }
721
722        for (kleene_key, group_streams) in &kleene_groups {
723            if group_streams.len() < 2 || kleene_key.is_empty() {
724                continue;
725            }
726
727            info!(
728                "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
729                group_streams.len(),
730                kleene_key,
731                group_streams,
732            );
733
734            let mut builder = TemplateBuilder::new();
735            type SharingEntry = (
736                String,
737                QueryRegistration,
738                Vec<(String, crate::greta::GretaAggregate)>,
739            );
740            let mut all_registrations: Vec<SharingEntry> = Vec::new();
741            let mut next_query_id: crate::greta::QueryId = 0;
742
743            for stream_name in group_streams {
744                if let Some(stream) = self.streams.get(stream_name) {
745                    if let Some(ref agg) = stream.hamlet_aggregator {
746                        for query in agg.registered_queries() {
747                            let new_id = next_query_id;
748                            next_query_id += 1;
749
750                            let event_names: Vec<String> = query
751                                .event_types
752                                .iter()
753                                .map(|&idx| format!("type_{idx}"))
754                                .collect();
755                            let name_strs: Vec<&str> =
756                                event_names.iter().map(|s| s.as_str()).collect();
757
758                            builder.add_sequence(new_id, &name_strs);
759
760                            for &kt in &query.kleene_types {
761                                let type_name = format!("type_{kt}");
762                                let position = event_names
763                                    .iter()
764                                    .position(|n| *n == type_name)
765                                    .unwrap_or(0);
766                                builder.add_kleene(new_id, &type_name, position as u16);
767                            }
768
769                            let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
770                                .operations
771                                .iter()
772                                .find_map(|op| {
773                                    if let RuntimeOp::TrendAggregate(config) = op {
774                                        Some(config.fields.clone())
775                                    } else {
776                                        None
777                                    }
778                                })
779                                .unwrap_or_default();
780
781                            all_registrations.push((
782                                stream_name.clone(),
783                                QueryRegistration {
784                                    id: new_id,
785                                    event_types: query.event_types.clone(),
786                                    kleene_types: query.kleene_types.clone(),
787                                    aggregate: query.aggregate,
788                                },
789                                fields,
790                            ));
791                        }
792                    }
793                }
794            }
795
796            let template = builder.build();
797            let config = HamletConfig {
798                window_ms: 60_000,
799                incremental: true,
800                ..Default::default()
801            };
802            let mut shared_agg = HamletAggregator::new(config, template);
803
804            for (_, registration, _) in &all_registrations {
805                shared_agg.register_query(registration.clone());
806            }
807
808            let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
809            self.shared_hamlet_aggregators.push(shared_ref.clone());
810
811            for (stream_name, registration, fields) in &all_registrations {
812                if let Some(stream) = self.streams.get_mut(stream_name) {
813                    stream.hamlet_aggregator = None;
814                    stream.shared_hamlet_ref = Some(shared_ref.clone());
815
816                    for op in &mut stream.operations {
817                        if let RuntimeOp::TrendAggregate(config) = op {
818                            config.query_id = registration.id;
819                            config.fields = fields.clone();
820                        }
821                    }
822                }
823            }
824
825            info!(
826                "Created shared Hamlet aggregator with {} queries",
827                next_query_id,
828            );
829        }
830    }
831
832    /// Connect all sinks that require explicit connection.
833    #[tracing::instrument(skip(self))]
834    pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
835        self.sinks
836            .connect_all()
837            .await
838            .map_err(error::EngineError::Pipeline)
839    }
840
841    /// Inject a pre-built sink into the engine's registry.
842    pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
843        self.sinks.insert(key.to_string(), sink);
844    }
845
846    /// Check whether a given key has a registered sink.
847    pub fn has_sink(&self, key: &str) -> bool {
848        self.sinks.cache().contains_key(key)
849    }
850
851    /// Return all sink keys that belong to a given connector name.
852    pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
853        let prefix = format!("{connector_name}::");
854        self.sinks
855            .cache()
856            .keys()
857            .filter(|k| *k == connector_name || k.starts_with(&prefix))
858            .cloned()
859            .collect()
860    }
861
862    // =========================================================================
863    // Query / Introspection
864    // =========================================================================
865
866    /// Check if any registered stream uses `.to()` or `.enrich()` operations.
867    pub fn has_sink_operations(&self) -> bool {
868        self.streams.values().any(|s| {
869            s.operations
870                .iter()
871                .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
872        })
873    }
874
875    /// Returns (events_in, events_out) counters for this engine.
876    pub const fn event_counters(&self) -> (u64, u64) {
877        (self.events_processed, self.output_events_emitted)
878    }
879
880    /// Check if all streams are stateless (safe for round-robin distribution).
881    pub fn is_stateless(&self) -> bool {
882        self.streams.values().all(|s| {
883            s.sase_engine.is_none()
884                && s.join_buffer.is_none()
885                && s.hamlet_aggregator.is_none()
886                && s.shared_hamlet_ref.is_none()
887                && s.operations.iter().all(|op| {
888                    matches!(
889                        op,
890                        RuntimeOp::WhereExpr(_)
891                            | RuntimeOp::WhereClosure(_)
892                            | RuntimeOp::Select(_)
893                            | RuntimeOp::Emit(_)
894                            | RuntimeOp::EmitExpr(_)
895                            | RuntimeOp::Print(_)
896                            | RuntimeOp::Log(_)
897                            | RuntimeOp::Having(_)
898                            | RuntimeOp::Process(_)
899                            | RuntimeOp::Pattern(_)
900                            | RuntimeOp::To(_)
901                    )
902                })
903        })
904    }
905
906    /// Return the partition key used by partitioned operations, if any.
907    pub fn partition_key(&self) -> Option<String> {
908        for stream in self.streams.values() {
909            if let Some(ref sase) = stream.sase_engine {
910                if let Some(key) = sase.partition_by() {
911                    return Some(key.to_string());
912                }
913            }
914            for op in &stream.operations {
915                match op {
916                    RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
917                    RuntimeOp::PartitionedSlidingCountWindow(pw) => {
918                        return Some(pw.partition_key.clone())
919                    }
920                    RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
921                    _ => {}
922                }
923            }
924            if stream.sase_engine.is_some() {
925                for op in &stream.operations {
926                    if let RuntimeOp::WhereExpr(expr) = op {
927                        if let Some(key) = compilation::extract_equality_join_key(expr) {
928                            return Some(key);
929                        }
930                    }
931                }
932            }
933        }
934        None
935    }
936
937    /// Check if any registered stream has session windows.
938    pub fn has_session_windows(&self) -> bool {
939        self.streams.values().any(|s| {
940            s.operations.iter().any(|op| {
941                matches!(
942                    op,
943                    RuntimeOp::Window(WindowType::Session(_))
944                        | RuntimeOp::Window(WindowType::PartitionedSession(_))
945                )
946            })
947        })
948    }
949
950    /// Return the smallest session gap across all streams (used as sweep interval).
951    pub fn min_session_gap(&self) -> Option<chrono::Duration> {
952        let mut min_gap: Option<chrono::Duration> = None;
953        for stream in self.streams.values() {
954            for op in &stream.operations {
955                if let RuntimeOp::Window(window) = op {
956                    let gap = match window {
957                        WindowType::Session(w) => Some(w.gap()),
958                        WindowType::PartitionedSession(w) => Some(w.gap()),
959                        _ => None,
960                    };
961                    if let Some(g) = gap {
962                        min_gap = Some(match min_gap {
963                            Some(current) if g < current => g,
964                            Some(current) => current,
965                            None => g,
966                        });
967                    }
968                }
969            }
970        }
971        min_gap
972    }
973
974    /// Get metrics
975    pub fn metrics(&self) -> EngineMetrics {
976        EngineMetrics {
977            events_processed: self.events_processed,
978            output_events_emitted: self.output_events_emitted,
979            streams_count: self.streams.len(),
980        }
981    }
982
983    /// Get the names of all loaded streams.
984    pub fn stream_names(&self) -> Vec<&str> {
985        self.streams.keys().map(|s| s.as_str()).collect()
986    }
987
988    /// Get the physical plan summary (available after `load()`).
989    pub fn physical_plan_summary(&self) -> Option<String> {
990        self.physical_plan.as_ref().map(|p| p.summary())
991    }
992
993    pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
994        let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
995        let optimized = varpulis_parser::optimize_plan(logical);
996        Ok(optimized.explain())
997    }
998
999    /// Build a topology snapshot of the currently loaded streams and routes.
1000    pub fn topology(&self) -> topology::Topology {
1001        let mut builder = topology_builder::TopologyBuilder::new();
1002        for (name, stream_def) in &self.streams {
1003            builder = builder.add_stream(name, stream_def);
1004        }
1005        builder = builder.add_routes(self.router.all_routes());
1006        builder.build()
1007    }
1008
1009    /// Get a user-defined function by name
1010    pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1011        self.functions.get(name)
1012    }
1013
1014    /// Get all registered function names
1015    pub fn function_names(&self) -> Vec<&str> {
1016        self.functions.keys().map(|s| s.as_str()).collect()
1017    }
1018
1019    /// Get all timer configurations for spawning timer tasks
1020    pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1021        let mut timers = Vec::new();
1022        for stream in self.streams.values() {
1023            if let RuntimeSource::Timer(config) = &stream.source {
1024                timers.push((
1025                    config.interval_ns,
1026                    config.initial_delay_ns,
1027                    config.timer_event_type.clone(),
1028                ));
1029            }
1030        }
1031        timers
1032    }
1033
1034    /// Register a native scalar UDF.
1035    pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1036        self.udf_registry.register_scalar(udf);
1037    }
1038
1039    /// Register a native aggregate UDF.
1040    pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1041        self.udf_registry.register_aggregate(udf);
1042    }
1043
1044    /// Get a reference to the UDF registry.
1045    pub const fn udf_registry(&self) -> &UdfRegistry {
1046        &self.udf_registry
1047    }
1048
1049    // =========================================================================
1050    // Hot Reload
1051    // =========================================================================
1052
1053    /// Reload program without losing state where possible.
1054    pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1055        let mut report = ReloadReport::default();
1056
1057        let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1058
1059        let mut new_engine = Self::new_internal(self.clone_output_channel());
1060        new_engine.load(program)?;
1061
1062        let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1063
1064        for name in new_streams.difference(&old_streams) {
1065            report.streams_added.push(name.clone());
1066        }
1067
1068        for name in old_streams.difference(&new_streams) {
1069            report.streams_removed.push(name.clone());
1070        }
1071
1072        for name in old_streams.intersection(&new_streams) {
1073            let old_stream = self.streams.get(name).unwrap();
1074            let new_stream = new_engine.streams.get(name).unwrap();
1075
1076            let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1077            let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1078
1079            if source_changed || ops_changed {
1080                report.streams_updated.push(name.clone());
1081                report.state_reset.push(name.clone());
1082            } else {
1083                report.state_preserved.push(name.clone());
1084            }
1085        }
1086
1087        for name in &report.streams_removed {
1088            self.streams.remove(name);
1089        }
1090
1091        self.router.clear();
1092
1093        for name in &report.streams_added {
1094            if let Some(stream) = new_engine.streams.remove(name) {
1095                self.streams.insert(name.clone(), stream);
1096            }
1097        }
1098
1099        for name in &report.streams_updated {
1100            if let Some(stream) = new_engine.streams.remove(name) {
1101                self.streams.insert(name.clone(), stream);
1102            }
1103        }
1104
1105        let registrations: Vec<(String, String)> = self
1106            .streams
1107            .iter()
1108            .flat_map(|(name, stream)| {
1109                let mut pairs = Vec::new();
1110                match &stream.source {
1111                    RuntimeSource::EventType(et) => {
1112                        pairs.push((et.clone(), name.clone()));
1113                    }
1114                    RuntimeSource::Stream(s) => {
1115                        pairs.push((s.clone(), name.clone()));
1116                    }
1117                    RuntimeSource::Merge(sources) => {
1118                        for ms in sources {
1119                            pairs.push((ms.event_type.clone(), name.clone()));
1120                        }
1121                    }
1122                    RuntimeSource::Join(_) => {}
1123                    RuntimeSource::Timer(config) => {
1124                        pairs.push((config.timer_event_type.clone(), name.clone()));
1125                    }
1126                }
1127                pairs
1128            })
1129            .collect();
1130
1131        for (event_type, stream_name) in registrations {
1132            self.router.add_route(&event_type, &stream_name);
1133        }
1134
1135        self.functions = new_engine.functions;
1136        self.patterns = new_engine.patterns;
1137        self.configs = new_engine.configs;
1138        self.context_map = new_engine.context_map;
1139        self.connectors = new_engine.connectors;
1140        self.source_bindings = new_engine.source_bindings;
1141        *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1142
1143        for (name, value) in new_engine.variables {
1144            if !self.variables.contains_key(&name) {
1145                self.variables.insert(name.clone(), value);
1146                self.mutable_vars
1147                    .extend(new_engine.mutable_vars.iter().cloned());
1148            }
1149        }
1150
1151        info!(
1152            "Hot reload complete: +{} -{} ~{} streams",
1153            report.streams_added.len(),
1154            report.streams_removed.len(),
1155            report.streams_updated.len()
1156        );
1157
1158        Ok(report)
1159    }
1160
1161    // =========================================================================
1162    // Checkpointing
1163    // =========================================================================
1164
1165    /// Create a checkpoint of the engine state (windows, SASE engines, joins, variables).
1166    pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1167        use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1168
1169        let mut window_states = std::collections::HashMap::new();
1170        let mut sase_states = std::collections::HashMap::new();
1171        let mut join_states = std::collections::HashMap::new();
1172        let mut distinct_states = std::collections::HashMap::new();
1173        let mut limit_states = std::collections::HashMap::new();
1174
1175        for (name, stream) in &self.streams {
1176            for op in &stream.operations {
1177                match op {
1178                    RuntimeOp::Window(wt) => {
1179                        let cp = match wt {
1180                            WindowType::Tumbling(w) => w.checkpoint(),
1181                            WindowType::Sliding(w) => w.checkpoint(),
1182                            WindowType::Count(w) => w.checkpoint(),
1183                            WindowType::SlidingCount(w) => w.checkpoint(),
1184                            WindowType::Session(w) => w.checkpoint(),
1185                            WindowType::PartitionedSession(w) => w.checkpoint(),
1186                            WindowType::PartitionedTumbling(w) => w.checkpoint(),
1187                            WindowType::PartitionedSliding(w) => w.checkpoint(),
1188                        };
1189                        window_states.insert(name.clone(), cp);
1190                    }
1191                    RuntimeOp::PartitionedWindow(pw) => {
1192                        let mut partitions = std::collections::HashMap::new();
1193                        for (key, cw) in &pw.windows {
1194                            let sub_cp = cw.checkpoint();
1195                            partitions.insert(
1196                                key.clone(),
1197                                crate::persistence::PartitionedWindowCheckpoint {
1198                                    events: sub_cp.events,
1199                                    window_start_ms: sub_cp.window_start_ms,
1200                                },
1201                            );
1202                        }
1203                        window_states.insert(
1204                            name.clone(),
1205                            WindowCheckpoint {
1206                                events: Vec::new(),
1207                                window_start_ms: None,
1208                                last_emit_ms: None,
1209                                partitions,
1210                            },
1211                        );
1212                    }
1213                    RuntimeOp::Distinct(state) => {
1214                        let keys: Vec<String> =
1215                            state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1216                        distinct_states.insert(
1217                            name.clone(),
1218                            crate::persistence::DistinctCheckpoint { keys },
1219                        );
1220                    }
1221                    RuntimeOp::Limit(state) => {
1222                        limit_states.insert(
1223                            name.clone(),
1224                            crate::persistence::LimitCheckpoint {
1225                                max: state.max,
1226                                count: state.count,
1227                            },
1228                        );
1229                    }
1230                    _ => {}
1231                }
1232            }
1233
1234            if let Some(ref sase) = stream.sase_engine {
1235                sase_states.insert(name.clone(), sase.checkpoint());
1236            }
1237
1238            if let Some(ref jb) = stream.join_buffer {
1239                join_states.insert(name.clone(), jb.checkpoint());
1240            }
1241        }
1242
1243        let variables = self
1244            .variables
1245            .iter()
1246            .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1247            .collect();
1248
1249        let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1250
1251        EngineCheckpoint {
1252            version: crate::persistence::CHECKPOINT_VERSION,
1253            window_states,
1254            sase_states,
1255            join_states,
1256            variables,
1257            events_processed: self.events_processed,
1258            output_events_emitted: self.output_events_emitted,
1259            watermark_state,
1260            distinct_states,
1261            limit_states,
1262        }
1263    }
1264
1265    /// Restore engine state from a checkpoint.
1266    pub fn restore_checkpoint(
1267        &mut self,
1268        cp: &crate::persistence::EngineCheckpoint,
1269    ) -> Result<(), crate::persistence::StoreError> {
1270        let mut migrated = cp.clone();
1271        migrated.validate_and_migrate()?;
1272
1273        self.events_processed = cp.events_processed;
1274        self.output_events_emitted = cp.output_events_emitted;
1275
1276        for (k, sv) in &cp.variables {
1277            self.variables
1278                .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1279        }
1280
1281        for (name, stream) in &mut self.streams {
1282            if let Some(wcp) = cp.window_states.get(name) {
1283                for op in &mut stream.operations {
1284                    match op {
1285                        RuntimeOp::Window(wt) => match wt {
1286                            WindowType::Tumbling(w) => w.restore(wcp),
1287                            WindowType::Sliding(w) => w.restore(wcp),
1288                            WindowType::Count(w) => w.restore(wcp),
1289                            WindowType::SlidingCount(w) => w.restore(wcp),
1290                            WindowType::Session(w) => w.restore(wcp),
1291                            WindowType::PartitionedSession(w) => w.restore(wcp),
1292                            WindowType::PartitionedTumbling(w) => w.restore(wcp),
1293                            WindowType::PartitionedSliding(w) => w.restore(wcp),
1294                        },
1295                        RuntimeOp::PartitionedWindow(pw) => {
1296                            for (key, pcp) in &wcp.partitions {
1297                                let sub_wcp = crate::persistence::WindowCheckpoint {
1298                                    events: pcp.events.clone(),
1299                                    window_start_ms: pcp.window_start_ms,
1300                                    last_emit_ms: None,
1301                                    partitions: std::collections::HashMap::new(),
1302                                };
1303                                let window = pw
1304                                    .windows
1305                                    .entry(key.clone())
1306                                    .or_insert_with(|| CountWindow::new(pw.window_size));
1307                                window.restore(&sub_wcp);
1308                            }
1309                        }
1310                        _ => {}
1311                    }
1312                }
1313            }
1314
1315            if let Some(scp) = cp.sase_states.get(name) {
1316                if let Some(ref mut sase) = stream.sase_engine {
1317                    sase.restore(scp);
1318                }
1319            }
1320
1321            if let Some(jcp) = cp.join_states.get(name) {
1322                if let Some(ref mut jb) = stream.join_buffer {
1323                    jb.restore(jcp);
1324                }
1325            }
1326
1327            if let Some(dcp) = cp.distinct_states.get(name) {
1328                for op in &mut stream.operations {
1329                    if let RuntimeOp::Distinct(state) = op {
1330                        state.seen.clear();
1331                        for key in dcp.keys.iter().rev() {
1332                            state.seen.insert(key.clone(), ());
1333                        }
1334                    }
1335                }
1336            }
1337
1338            if let Some(lcp) = cp.limit_states.get(name) {
1339                for op in &mut stream.operations {
1340                    if let RuntimeOp::Limit(state) = op {
1341                        state.max = lcp.max;
1342                        state.count = lcp.count;
1343                    }
1344                }
1345            }
1346        }
1347
1348        if let Some(ref wcp) = cp.watermark_state {
1349            if self.watermark_tracker.is_none() {
1350                self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1351            }
1352            if let Some(ref mut tracker) = self.watermark_tracker {
1353                tracker.restore(wcp);
1354                self.last_applied_watermark = wcp
1355                    .effective_watermark_ms
1356                    .and_then(DateTime::from_timestamp_millis);
1357            }
1358        }
1359
1360        info!(
1361            "Engine restored: {} events processed, {} streams with state (schema v{})",
1362            cp.events_processed,
1363            cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1364            cp.version
1365        );
1366
1367        Ok(())
1368    }
1369
1370    /// Enable auto-checkpointing with the given store and config.
1371    pub fn enable_checkpointing(
1372        &mut self,
1373        store: std::sync::Arc<dyn crate::persistence::StateStore>,
1374        config: crate::persistence::CheckpointConfig,
1375    ) -> Result<(), crate::persistence::StoreError> {
1376        let manager = crate::persistence::CheckpointManager::new(store, config)?;
1377
1378        if let Some(cp) = manager.recover()? {
1379            if let Some(engine_cp) = cp.context_states.get("main") {
1380                self.restore_checkpoint(engine_cp)?;
1381                info!(
1382                    "Auto-restored from checkpoint {} ({} events)",
1383                    cp.id, cp.events_processed
1384                );
1385            }
1386        }
1387
1388        self.checkpoint_manager = Some(manager);
1389        Ok(())
1390    }
1391
1392    /// Check if a checkpoint is due and create one if needed.
1393    pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1394        let should = self
1395            .checkpoint_manager
1396            .as_ref()
1397            .is_some_and(|m| m.should_checkpoint());
1398
1399        if should {
1400            self.force_checkpoint()?;
1401        }
1402        Ok(())
1403    }
1404
1405    /// Force an immediate checkpoint regardless of the interval.
1406    pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1407        if self.checkpoint_manager.is_none() {
1408            return Ok(());
1409        }
1410
1411        let engine_cp = self.create_checkpoint();
1412        let events_processed = self.events_processed;
1413
1414        let mut context_states = std::collections::HashMap::new();
1415        context_states.insert("main".to_string(), engine_cp);
1416
1417        let cp = crate::persistence::Checkpoint {
1418            id: 0,
1419            timestamp_ms: 0,
1420            events_processed,
1421            window_states: std::collections::HashMap::new(),
1422            pattern_states: std::collections::HashMap::new(),
1423            metadata: std::collections::HashMap::new(),
1424            context_states,
1425        };
1426
1427        self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1428        Ok(())
1429    }
1430
1431    /// Returns true if auto-checkpointing is enabled.
1432    pub const fn has_checkpointing(&self) -> bool {
1433        self.checkpoint_manager.is_some()
1434    }
1435
1436    // =========================================================================
1437    // Watermark Management
1438    // =========================================================================
1439
1440    /// Enable per-source watermark tracking for this engine.
1441    pub fn enable_watermark_tracking(&mut self) {
1442        if self.watermark_tracker.is_none() {
1443            self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1444        }
1445    }
1446
1447    /// Register a source for watermark tracking with its max out-of-orderness.
1448    pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1449        if let Some(ref mut tracker) = self.watermark_tracker {
1450            tracker.register_source(source, max_ooo);
1451        }
1452    }
1453
1454    /// Advance the watermark from an external source (e.g., upstream context).
1455    #[tracing::instrument(skip(self))]
1456    pub async fn advance_external_watermark(
1457        &mut self,
1458        source_context: &str,
1459        watermark_ms: i64,
1460    ) -> Result<(), error::EngineError> {
1461        if let Some(ref mut tracker) = self.watermark_tracker {
1462            if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1463                tracker.advance_source_watermark(source_context, wm);
1464
1465                if let Some(new_wm) = tracker.effective_watermark() {
1466                    if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1467                        self.apply_watermark_to_windows(new_wm).await?;
1468                        self.last_applied_watermark = Some(new_wm);
1469                    }
1470                }
1471            }
1472        }
1473        Ok(())
1474    }
1475
1476    /// Check if two runtime sources are compatible for state preservation
1477    fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1478        match (a, b) {
1479            (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1480            (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1481            (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1482                a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1483            }
1484            (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1485                a.len() == b.len()
1486                    && a.iter()
1487                        .zip(b.iter())
1488                        .all(|(x, y)| x.event_type == y.event_type)
1489            }
1490            (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1491            _ => false,
1492        }
1493    }
1494}