Skip to main content

varpulis_runtime/engine/
mod.rs

1//! Main execution engine for Varpulis
2//!
3//! This module provides the core engine that processes events and executes
4//! stream definitions written in VPL.
5
6mod builder;
7mod compilation;
8pub mod compiler;
9mod dispatch;
10pub mod error;
11pub mod evaluator;
12mod pattern_analyzer;
13pub mod physical_plan;
14mod pipeline;
15pub mod planner;
16mod router;
17mod sink_factory;
18pub mod topology;
19pub mod topology_builder;
20mod types;
21
22// Re-export public types
23use std::sync::Arc;
24
25pub use builder::EngineBuilder;
26use chrono::{DateTime, Duration, Utc};
27// Re-export evaluator for use by other modules (e.g., SASE+)
28pub use evaluator::eval_filter_expr;
29use rustc_hash::{FxHashMap, FxHashSet};
30pub use sink_factory::SinkConnectorAdapter;
31use tokio::sync::mpsc;
32use tracing::{info, warn};
33// Re-export NamedPattern from types
34pub use types::NamedPattern;
35pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
36// Re-export internal types for use within the engine module
37use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
38use varpulis_core::ast::{ConfigItem, Program, Stmt};
39use varpulis_core::Value;
40
41use crate::connector;
42use crate::context::ContextMap;
43use crate::event::{Event, SharedEvent};
44use crate::metrics::Metrics;
45use crate::sase_persistence::SaseCheckpointExt;
46use crate::sequence::SequenceContext;
47use crate::udf::UdfRegistry;
48use crate::watermark::PerSourceWatermarkTracker;
49use crate::window::CountWindow;
50
51/// Output channel type enumeration for zero-copy or owned event sending
52#[derive(Debug)]
53pub(super) enum OutputChannel {
54    /// Legacy channel that requires cloning (for backwards compatibility)
55    Owned(mpsc::Sender<Event>),
56    /// Zero-copy channel using SharedEvent (Arc<Event>)
57    Shared(mpsc::Sender<SharedEvent>),
58}
59
60/// The main Varpulis engine
61pub struct Engine {
62    /// Registered stream definitions
63    pub(super) streams: FxHashMap<String, StreamDefinition>,
64    /// Event routing: maps event types to stream names
65    pub(super) router: router::EventRouter,
66    /// User-defined functions
67    pub(super) functions: FxHashMap<String, UserFunction>,
68    /// Named patterns for reuse
69    pub(super) patterns: FxHashMap<String, NamedPattern>,
70    /// Configuration blocks (e.g., mqtt, kafka)
71    pub(super) configs: FxHashMap<String, EngineConfig>,
72    /// Mutable variables accessible across events
73    pub(super) variables: FxHashMap<String, Value>,
74    /// Tracks which variables are declared as mutable (var vs let)
75    pub(super) mutable_vars: FxHashSet<String>,
76    /// Declared connectors from VPL
77    pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
78    /// Source connector bindings from .from() declarations
79    pub(super) source_bindings: Vec<SourceBinding>,
80    /// Sink registry for .to() operations
81    pub(super) sinks: sink_factory::SinkRegistry,
82    /// Output event sender (None for benchmark/quiet mode - skips cloning overhead)
83    pub(super) output_channel: Option<OutputChannel>,
84    /// Metrics
85    pub(super) events_processed: u64,
86    pub(super) output_events_emitted: u64,
87    /// Prometheus metrics
88    pub(super) metrics: Option<Metrics>,
89    /// Context assignments for multi-threaded execution
90    pub(super) context_map: ContextMap,
91    /// Per-source watermark tracker for event-time processing
92    pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
93    /// Last applied watermark (for detecting advances)
94    pub(super) last_applied_watermark: Option<DateTime<Utc>>,
95    /// Late data configurations per stream
96    pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
97    /// Context name when running inside a context thread (used for unique connector IDs)
98    pub(super) context_name: Option<String>,
99    /// Topic prefix for multi-tenant Kafka/MQTT isolation (prepended to all topic names)
100    pub(super) topic_prefix: Option<String>,
101    /// Shared Hamlet aggregators for multi-query optimization
102    pub(super) shared_hamlet_aggregators:
103        Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
104    /// Auto-checkpointing manager (None = checkpointing disabled)
105    pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
106    /// Connector credentials store for secure profile resolution
107    pub(super) credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
108    /// Custom DLQ file path (defaults to "varpulis-dlq.jsonl")
109    pub(super) dlq_path: Option<std::path::PathBuf>,
110    /// DLQ configuration (rotation limits etc.)
111    pub(super) dlq_config: crate::dead_letter::DlqConfig,
112    /// Shared DLQ instance (created during load())
113    pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
114    /// Physical plan snapshot for introspection (built during load_program)
115    pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
116    /// Registry for native Rust UDFs (scalar + aggregate)
117    pub(super) udf_registry: UdfRegistry,
118}
119
120impl std::fmt::Debug for Engine {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("Engine")
123            .field("streams", &self.streams.keys().collect::<Vec<_>>())
124            .field("functions", &self.functions.keys().collect::<Vec<_>>())
125            .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
126            .field("configs", &self.configs.keys().collect::<Vec<_>>())
127            .field("connectors", &self.connectors.keys().collect::<Vec<_>>())
128            .field("events_processed", &self.events_processed)
129            .field("output_events_emitted", &self.output_events_emitted)
130            .field("context_map", &self.context_map)
131            .field("context_name", &self.context_name)
132            .field("topic_prefix", &self.topic_prefix)
133            .finish_non_exhaustive()
134    }
135}
136
137impl Engine {
138    /// Create an [`EngineBuilder`] for fluent engine construction.
139    ///
140    /// # Examples
141    ///
142    /// ```rust,no_run
143    /// use varpulis_runtime::Engine;
144    /// use tokio::sync::mpsc;
145    ///
146    /// let (tx, _rx) = mpsc::channel(100);
147    /// let mut engine = Engine::builder()
148    ///     .output(tx)
149    ///     .build();
150    /// ```
151    pub fn builder() -> EngineBuilder {
152        EngineBuilder::new()
153    }
154
155    pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
156        Self {
157            streams: FxHashMap::default(),
158            router: router::EventRouter::new(),
159            functions: FxHashMap::default(),
160            patterns: FxHashMap::default(),
161            configs: FxHashMap::default(),
162            variables: FxHashMap::default(),
163            mutable_vars: FxHashSet::default(),
164            connectors: FxHashMap::default(),
165            source_bindings: Vec::new(),
166            sinks: sink_factory::SinkRegistry::new(),
167            output_channel: Some(OutputChannel::Owned(output_tx)),
168            events_processed: 0,
169            output_events_emitted: 0,
170            metrics: None,
171            context_map: ContextMap::new(),
172            watermark_tracker: None,
173            last_applied_watermark: None,
174            late_data_configs: FxHashMap::default(),
175            context_name: None,
176            topic_prefix: None,
177            shared_hamlet_aggregators: Vec::new(),
178            checkpoint_manager: None,
179            credentials_store: None,
180            dlq_path: None,
181            dlq_config: crate::dead_letter::DlqConfig::default(),
182            dlq: None,
183            physical_plan: None,
184            udf_registry: UdfRegistry::new(),
185        }
186    }
187
188    /// Create engine without output channel (for benchmarking - skips Event cloning overhead)
189    pub fn new_benchmark() -> Self {
190        Self::new_internal(None)
191    }
192
193    /// Create engine with optional output channel (legacy API, requires cloning)
194    pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
195        Self::new_internal(output_tx.map(OutputChannel::Owned))
196    }
197
198    /// Create engine with zero-copy SharedEvent output channel (PERF: avoids cloning)
199    pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
200        Self::new_internal(Some(OutputChannel::Shared(output_tx)))
201    }
202
203    /// Set the connector credentials store for profile resolution.
204    pub fn set_credentials_store(&mut self, store: Arc<connector::credentials::CredentialsStore>) {
205        self.credentials_store = Some(store);
206    }
207
208    /// Internal constructor
209    fn new_internal(output_channel: Option<OutputChannel>) -> Self {
210        Self {
211            streams: FxHashMap::default(),
212            router: router::EventRouter::new(),
213            functions: FxHashMap::default(),
214            patterns: FxHashMap::default(),
215            configs: FxHashMap::default(),
216            variables: FxHashMap::default(),
217            mutable_vars: FxHashSet::default(),
218            connectors: FxHashMap::default(),
219            source_bindings: Vec::new(),
220            sinks: sink_factory::SinkRegistry::new(),
221            output_channel,
222            events_processed: 0,
223            output_events_emitted: 0,
224            metrics: None,
225            context_map: ContextMap::new(),
226            watermark_tracker: None,
227            last_applied_watermark: None,
228            late_data_configs: FxHashMap::default(),
229            context_name: None,
230            topic_prefix: None,
231            shared_hamlet_aggregators: Vec::new(),
232            checkpoint_manager: None,
233            credentials_store: None,
234            dlq_path: None,
235            dlq_config: crate::dead_letter::DlqConfig::default(),
236            dlq: None,
237            physical_plan: None,
238            udf_registry: UdfRegistry::new(),
239        }
240    }
241
242    /// Clone the output channel for use in engine reload
243    fn clone_output_channel(&self) -> Option<OutputChannel> {
244        match &self.output_channel {
245            Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
246            Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
247            None => None,
248        }
249    }
250
251    /// Send an output event to the output channel (if configured).
252    /// In benchmark mode (no output channel), this is a no-op to avoid cloning overhead.
253    /// PERF: Uses zero-copy for SharedEvent channels, clones only for legacy Owned channels.
254    #[inline]
255    pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
256        match &self.output_channel {
257            Some(OutputChannel::Shared(tx)) => {
258                // PERF: Zero-copy - just increment Arc refcount
259                if let Err(e) = tx.try_send(Arc::clone(event)) {
260                    warn!("Failed to send output event: {}", e);
261                }
262            }
263            Some(OutputChannel::Owned(tx)) => {
264                // Legacy path: must clone the Event
265                let owned = (**event).clone();
266                if let Err(e) = tx.try_send(owned) {
267                    warn!("Failed to send output event: {}", e);
268                }
269            }
270            None => {
271                // Benchmark mode: skip sending entirely - no clone!
272            }
273        }
274    }
275
276    /// Send an output event to the output channel (if configured).
277    /// In benchmark mode (no output channel), this is a no-op to avoid cloning overhead.
278    #[inline]
279    pub(super) fn send_output(&mut self, event: Event) {
280        match &self.output_channel {
281            Some(OutputChannel::Shared(tx)) => {
282                // Wrap in Arc for shared channel
283                if let Err(e) = tx.try_send(Arc::new(event)) {
284                    warn!("Failed to send output event: {}", e);
285                }
286            }
287            Some(OutputChannel::Owned(tx)) => {
288                if let Err(e) = tx.try_send(event) {
289                    warn!("Failed to send output event: {}", e);
290                }
291            }
292            None => {
293                // Benchmark mode: skip sending entirely
294            }
295        }
296    }
297
298    /// Set the context name for this engine instance.
299    pub fn set_context_name(&mut self, name: &str) {
300        self.context_name = Some(name.to_string());
301    }
302
303    /// Set a topic prefix for multi-tenant isolation (call before `load()`).
304    ///
305    /// When set, all Kafka and MQTT topic names will be prefixed with
306    /// `{prefix}.` to enforce per-tenant topic isolation.
307    pub fn set_topic_prefix(&mut self, prefix: &str) {
308        self.topic_prefix = Some(prefix.to_string());
309    }
310
311    /// Set a custom DLQ file path (call before `load()`).
312    pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
313        self.dlq_path = Some(path);
314    }
315
316    /// Set custom DLQ configuration (call before `load()`).
317    pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
318        self.dlq_config = config;
319    }
320
321    /// Access the shared DLQ instance (created during `load()`).
322    pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
323        self.dlq.as_ref()
324    }
325
326    /// Get a named pattern by name
327    pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
328        self.patterns.get(name)
329    }
330
331    /// Get all registered patterns
332    pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
333        &self.patterns
334    }
335
336    /// Get a configuration block by name
337    pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
338        self.configs.get(name)
339    }
340
341    /// Get a declared connector by name
342    pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
343        self.connectors.get(name)
344    }
345
346    /// Get all declared connector configs (for building a ManagedConnectorRegistry).
347    pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
348        &self.connectors
349    }
350
351    /// Get source connector bindings from .from() declarations
352    pub fn source_bindings(&self) -> &[SourceBinding] {
353        &self.source_bindings
354    }
355
356    /// Get a variable value by name
357    pub fn get_variable(&self, name: &str) -> Option<&Value> {
358        self.variables.get(name)
359    }
360
361    /// Set a variable value (must be mutable or new)
362    pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
363        if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
364            return Err(error::EngineError::Compilation(format!(
365                "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
366            )));
367        }
368        self.variables.insert(name.to_string(), value);
369        Ok(())
370    }
371
372    /// Get all variables (for debugging/testing)
373    pub const fn variables(&self) -> &FxHashMap<String, Value> {
374        &self.variables
375    }
376
377    /// Get the context map (for orchestrator setup)
378    pub const fn context_map(&self) -> &ContextMap {
379        &self.context_map
380    }
381
382    /// Check if the loaded program declares any contexts
383    pub fn has_contexts(&self) -> bool {
384        self.context_map.has_contexts()
385    }
386
387    /// Enable Prometheus metrics
388    pub fn with_metrics(mut self, metrics: Metrics) -> Self {
389        self.metrics = Some(metrics);
390        self
391    }
392
393    /// Add a programmatic filter to a stream using a closure
394    pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
395    where
396        F: Fn(&Event) -> bool + Send + Sync + 'static,
397    {
398        if let Some(stream) = self.streams.get_mut(stream_name) {
399            let wrapped = move |e: &SharedEvent| filter(e.as_ref());
400            stream
401                .operations
402                .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
403            Ok(())
404        } else {
405            Err(error::EngineError::StreamNotFound(stream_name.to_string()))
406        }
407    }
408
409    /// Load a program into the engine with semantic validation.
410    pub fn load_with_source(
411        &mut self,
412        source: &str,
413        program: &Program,
414    ) -> Result<(), error::EngineError> {
415        let validation = varpulis_core::validate::validate(source, program);
416        if validation.has_errors() {
417            return Err(error::EngineError::Compilation(validation.format(source)));
418        }
419        for warning in validation
420            .diagnostics
421            .iter()
422            .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
423        {
424            warn!("{}", warning.message);
425        }
426        self.load_program(program)
427    }
428
429    /// Load a program into the engine (no semantic validation).
430    pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
431        self.load_program(program)
432    }
433
434    fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
435        for stmt in &program.statements {
436            match &stmt.node {
437                Stmt::StreamDecl {
438                    name, source, ops, ..
439                } => {
440                    self.register_stream(name, source, ops)?;
441                }
442                Stmt::EventDecl { name, fields, .. } => {
443                    info!(
444                        "Registered event type: {} with {} fields",
445                        name,
446                        fields.len()
447                    );
448                }
449                Stmt::FnDecl {
450                    name,
451                    params,
452                    ret,
453                    body,
454                } => {
455                    let user_fn = UserFunction {
456                        name: name.clone(),
457                        params: params
458                            .iter()
459                            .map(|p| (p.name.clone(), p.ty.clone()))
460                            .collect(),
461                        return_type: ret.clone(),
462                        body: body.clone(),
463                    };
464                    info!(
465                        "Registered function: {}({} params)",
466                        name,
467                        user_fn.params.len()
468                    );
469                    self.functions.insert(name.clone(), user_fn);
470                }
471                Stmt::Config { name, items } => {
472                    warn!(
473                        "DEPRECATED: 'config {}' block syntax is deprecated. \
474                         Use 'connector' declarations instead: \
475                         connector MyConn = {} (...)",
476                        name, name
477                    );
478                    let mut values = std::collections::HashMap::new();
479                    for item in items {
480                        if let ConfigItem::Value(key, val) = item {
481                            values.insert(key.clone(), val.clone());
482                        }
483                    }
484                    info!(
485                        "Registered config block: {} with {} items",
486                        name,
487                        values.len()
488                    );
489                    self.configs.insert(
490                        name.clone(),
491                        EngineConfig {
492                            name: name.clone(),
493                            values,
494                        },
495                    );
496                }
497                Stmt::PatternDecl {
498                    name,
499                    expr,
500                    within,
501                    partition_by,
502                } => {
503                    let named_pattern = NamedPattern {
504                        name: name.clone(),
505                        expr: expr.clone(),
506                        within: within.clone(),
507                        partition_by: partition_by.clone(),
508                    };
509                    info!(
510                        "Registered SASE+ pattern: {} (within: {}, partition: {})",
511                        name,
512                        within.is_some(),
513                        partition_by.is_some()
514                    );
515                    self.patterns.insert(name.clone(), named_pattern);
516                }
517                Stmt::Import { path, alias } => {
518                    warn!(
519                        "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
520                        path, alias
521                    );
522                }
523                Stmt::VarDecl {
524                    mutable,
525                    name,
526                    value,
527                    ..
528                } => {
529                    let dummy_event = Event::new("__init__");
530                    let empty_ctx = SequenceContext::new();
531                    let initial_value = evaluator::eval_expr_with_functions(
532                        value,
533                        &dummy_event,
534                        &empty_ctx,
535                        &self.functions,
536                        &self.variables,
537                    )
538                    .ok_or_else(|| {
539                        error::EngineError::Compilation(format!(
540                            "Failed to evaluate initial value for variable '{name}'"
541                        ))
542                    })?;
543
544                    info!(
545                        "Registered {} variable: {} = {:?}",
546                        if *mutable { "mutable" } else { "immutable" },
547                        name,
548                        initial_value
549                    );
550
551                    self.variables.insert(name.clone(), initial_value);
552                    if *mutable {
553                        self.mutable_vars.insert(name.clone());
554                    }
555                }
556                Stmt::Assignment { name, value } => {
557                    let dummy_event = Event::new("__assign__");
558                    let empty_ctx = SequenceContext::new();
559                    let new_value = evaluator::eval_expr_with_functions(
560                        value,
561                        &dummy_event,
562                        &empty_ctx,
563                        &self.functions,
564                        &self.variables,
565                    )
566                    .ok_or_else(|| {
567                        error::EngineError::Compilation(format!(
568                            "Failed to evaluate assignment value for '{name}'"
569                        ))
570                    })?;
571
572                    if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
573                        return Err(error::EngineError::Compilation(format!(
574                            "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
575                        )));
576                    }
577
578                    if !self.variables.contains_key(name) {
579                        self.mutable_vars.insert(name.clone());
580                    }
581
582                    info!("Assigned variable: {} = {:?}", name, new_value);
583                    self.variables.insert(name.clone(), new_value);
584                }
585                Stmt::ContextDecl { name, cores } => {
586                    use crate::context::ContextConfig;
587                    info!("Registered context: {} (cores: {:?})", name, cores);
588                    self.context_map.register_context(ContextConfig {
589                        name: name.clone(),
590                        cores: cores.clone(),
591                    });
592                }
593                Stmt::ConnectorDecl {
594                    name,
595                    connector_type,
596                    params,
597                } => {
598                    let mut config =
599                        sink_factory::connector_params_to_config(connector_type, params);
600
601                    // Resolve credentials profile if specified
602                    if let Some(profile_name) = config.properties.swap_remove("profile") {
603                        if let Some(ref store) = self.credentials_store {
604                            store
605                                .merge_profile(&profile_name, &mut config)
606                                .map_err(|e| {
607                                    error::EngineError::Compilation(format!(
608                                        "Failed to resolve credentials profile '{}' for connector '{}': {}",
609                                        profile_name, name, e
610                                    ))
611                                })?;
612                            info!(
613                                "Registered connector: {} (type: {}, profile: {})",
614                                name, connector_type, profile_name
615                            );
616                        } else {
617                            return Err(error::EngineError::Compilation(format!(
618                                "Connector '{}' references profile '{}' but no credentials file was provided. \
619                                 Use --credentials or set VARPULIS_CREDENTIALS.",
620                                name, profile_name
621                            )));
622                        }
623                    } else {
624                        info!("Registered connector: {} (type: {})", name, connector_type);
625                    }
626
627                    self.connectors.insert(name.clone(), config);
628                }
629                _ => {
630                    tracing::debug!("Skipping statement: {:?}", stmt.node);
631                }
632            }
633        }
634
635        // Collect all sink_keys actually referenced by stream operations
636        let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
637        let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
638        for stream in self.streams.values() {
639            for op in &stream.operations {
640                if let RuntimeOp::To(to_config) = op {
641                    referenced_sink_keys.insert(to_config.sink_key.clone());
642                    match &to_config.topic {
643                        Some(types::TopicSpec::Static(topic)) => {
644                            topic_overrides.push((
645                                to_config.sink_key.clone(),
646                                to_config.connector_name.clone(),
647                                topic.clone(),
648                            ));
649                        }
650                        Some(types::TopicSpec::Dynamic(_)) => {
651                            // Dynamic topics use the base connector — ensure it's registered
652                            referenced_sink_keys.insert(to_config.connector_name.clone());
653                        }
654                        None => {}
655                    }
656                }
657            }
658        }
659
660        // Build sinks using the registry
661        self.sinks.build_from_connectors(
662            &self.connectors,
663            &referenced_sink_keys,
664            &topic_overrides,
665            self.context_name.as_deref(),
666            self.topic_prefix.as_deref(),
667        );
668
669        // Wrap sinks with circuit breaker + DLQ when sink operations exist
670        if !self.sinks.cache().is_empty() {
671            let dlq_path = self
672                .dlq_path
673                .clone()
674                .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
675            let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
676                &dlq_path,
677                self.dlq_config.clone(),
678            )
679            .map(Arc::new)
680            .ok();
681            if dlq.is_some() {
682                tracing::info!(
683                    "Dead letter queue enabled at {} for {} sink(s)",
684                    dlq_path.display(),
685                    self.sinks.cache().len()
686                );
687            }
688            self.dlq = dlq.clone();
689            self.sinks.wrap_with_resilience(
690                crate::circuit_breaker::CircuitBreakerConfig::default(),
691                dlq,
692                self.metrics.clone(),
693            );
694        }
695
696        // Phase 2: Detect multi-query Hamlet sharing opportunities
697        self.setup_hamlet_sharing();
698
699        // Build physical plan snapshot for introspection
700        let mut plan = physical_plan::PhysicalPlan::new();
701        let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
702        for (event_type, targets) in self.router.all_routes() {
703            for target in targets.iter() {
704                stream_event_types
705                    .entry(target.clone())
706                    .or_default()
707                    .push(event_type.clone());
708            }
709        }
710        for (name, stream_def) in &self.streams {
711            let op_summary = stream_def
712                .operations
713                .iter()
714                .map(|op| op.summary_name())
715                .collect::<Vec<_>>()
716                .join(" → ");
717            plan.add_stream(physical_plan::PhysicalStream {
718                name: name.clone(),
719                operation_count: stream_def.operations.len(),
720                operation_summary: op_summary,
721                logical_id: plan.stream_count() as u32,
722                registered_event_types: stream_event_types
723                    .remove(name.as_str())
724                    .unwrap_or_default(),
725            });
726        }
727        self.physical_plan = Some(plan);
728
729        Ok(())
730    }
731
732    /// Detect overlapping Kleene patterns across streams using `.trend_aggregate()`
733    /// and replace per-stream aggregators with shared ones.
734    fn setup_hamlet_sharing(&mut self) {
735        use crate::hamlet::template::TemplateBuilder;
736        use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
737
738        let hamlet_streams: Vec<String> = self
739            .streams
740            .iter()
741            .filter(|(_, s)| s.hamlet_aggregator.is_some())
742            .map(|(name, _)| name.clone())
743            .collect();
744
745        if hamlet_streams.len() < 2 {
746            return;
747        }
748
749        let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
750
751        for stream_name in &hamlet_streams {
752            if let Some(stream) = self.streams.get(stream_name) {
753                let mut kleene_types = Vec::new();
754                for op in &stream.operations {
755                    if let RuntimeOp::TrendAggregate(_) = op {
756                        if let Some(ref agg) = stream.hamlet_aggregator {
757                            let tmpl = agg.merged_template();
758                            for query in agg.registered_queries() {
759                                for &kt in &query.kleene_types {
760                                    let type_name =
761                                        tmpl.type_name(kt).unwrap_or("unknown").to_string();
762                                    if !kleene_types.contains(&type_name) {
763                                        kleene_types.push(type_name);
764                                    }
765                                }
766                            }
767                        }
768                    }
769                }
770                kleene_types.sort();
771                kleene_groups
772                    .entry(kleene_types)
773                    .or_default()
774                    .push(stream_name.clone());
775            }
776        }
777
778        for (kleene_key, group_streams) in &kleene_groups {
779            if group_streams.len() < 2 || kleene_key.is_empty() {
780                continue;
781            }
782
783            info!(
784                "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
785                group_streams.len(),
786                kleene_key,
787                group_streams,
788            );
789
790            let mut builder = TemplateBuilder::new();
791            type SharingEntry = (
792                String,
793                QueryRegistration,
794                Vec<(String, crate::greta::GretaAggregate)>,
795            );
796            let mut all_registrations: Vec<SharingEntry> = Vec::new();
797            let mut next_query_id: crate::greta::QueryId = 0;
798
799            for stream_name in group_streams {
800                if let Some(stream) = self.streams.get(stream_name) {
801                    if let Some(ref agg) = stream.hamlet_aggregator {
802                        let tmpl = agg.merged_template();
803                        for query in agg.registered_queries() {
804                            let new_id = next_query_id;
805                            next_query_id += 1;
806
807                            // Resolve type indices to actual event type names
808                            let event_names: Vec<String> = query
809                                .event_types
810                                .iter()
811                                .map(|&idx| tmpl.type_name(idx).unwrap_or("unknown").to_string())
812                                .collect();
813                            let name_strs: Vec<&str> =
814                                event_names.iter().map(String::as_str).collect();
815
816                            // Record base state before adding states for this query
817                            let base_state = builder.num_states() as u16;
818                            builder.add_sequence(new_id, &name_strs);
819
820                            for &kt in &query.kleene_types {
821                                let type_name = tmpl.type_name(kt).unwrap_or("unknown").to_string();
822                                let position = event_names
823                                    .iter()
824                                    .position(|n| *n == type_name)
825                                    .unwrap_or(0);
826                                // Self-loop at target state (base + position + 1)
827                                // since add_sequence creates states starting at base_state
828                                let kleene_state = base_state + position as u16 + 1;
829                                builder.add_kleene(new_id, &type_name, kleene_state);
830                            }
831
832                            let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
833                                .operations
834                                .iter()
835                                .find_map(|op| {
836                                    if let RuntimeOp::TrendAggregate(config) = op {
837                                        Some(config.fields.clone())
838                                    } else {
839                                        None
840                                    }
841                                })
842                                .unwrap_or_default();
843
844                            all_registrations.push((
845                                stream_name.clone(),
846                                QueryRegistration {
847                                    id: new_id,
848                                    event_types: query.event_types.clone(),
849                                    kleene_types: query.kleene_types.clone(),
850                                    aggregate: query.aggregate,
851                                },
852                                fields,
853                            ));
854                        }
855                    }
856                }
857            }
858
859            let template = builder.build();
860            let config = HamletConfig {
861                window_ms: 60_000,
862                incremental: true,
863                ..Default::default()
864            };
865            let mut shared_agg = HamletAggregator::new(config, template);
866
867            for (_, registration, _) in &all_registrations {
868                shared_agg.register_query(registration.clone());
869            }
870
871            let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
872            self.shared_hamlet_aggregators.push(shared_ref.clone());
873
874            for (stream_name, registration, fields) in &all_registrations {
875                if let Some(stream) = self.streams.get_mut(stream_name) {
876                    stream.hamlet_aggregator = None;
877                    stream.shared_hamlet_ref = Some(shared_ref.clone());
878
879                    for op in &mut stream.operations {
880                        if let RuntimeOp::TrendAggregate(config) = op {
881                            config.query_id = registration.id;
882                            config.fields = fields.clone();
883                        }
884                    }
885                }
886            }
887
888            info!(
889                "Created shared Hamlet aggregator with {} queries",
890                next_query_id,
891            );
892        }
893    }
894
895    /// Connect all sinks that require explicit connection.
896    #[tracing::instrument(skip(self))]
897    pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
898        self.sinks
899            .connect_all()
900            .await
901            .map_err(error::EngineError::Pipeline)
902    }
903
904    /// Inject a pre-built sink into the engine's registry.
905    pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
906        self.sinks.insert(key.to_string(), sink);
907    }
908
909    /// Check whether a given key has a registered sink.
910    pub fn has_sink(&self, key: &str) -> bool {
911        self.sinks.cache().contains_key(key)
912    }
913
914    /// Return all sink keys that belong to a given connector name.
915    pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
916        let prefix = format!("{connector_name}::");
917        self.sinks
918            .cache()
919            .keys()
920            .filter(|k| *k == connector_name || k.starts_with(&prefix))
921            .cloned()
922            .collect()
923    }
924
925    // =========================================================================
926    // Query / Introspection
927    // =========================================================================
928
929    /// Check if any registered stream uses `.to()` or `.enrich()` operations.
930    pub fn has_sink_operations(&self) -> bool {
931        self.streams.values().any(|s| {
932            s.operations
933                .iter()
934                .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
935        })
936    }
937
938    /// Returns (events_in, events_out) counters for this engine.
939    pub const fn event_counters(&self) -> (u64, u64) {
940        (self.events_processed, self.output_events_emitted)
941    }
942
943    /// Check if all streams are stateless (safe for round-robin distribution).
944    pub fn is_stateless(&self) -> bool {
945        self.streams.values().all(|s| {
946            s.sase_engine.is_none()
947                && s.join_buffer.is_none()
948                && s.hamlet_aggregator.is_none()
949                && s.shared_hamlet_ref.is_none()
950                && s.operations.iter().all(|op| {
951                    matches!(
952                        op,
953                        RuntimeOp::WhereExpr(_)
954                            | RuntimeOp::WhereClosure(_)
955                            | RuntimeOp::Select(_)
956                            | RuntimeOp::Emit(_)
957                            | RuntimeOp::EmitExpr(_)
958                            | RuntimeOp::Print(_)
959                            | RuntimeOp::Log(_)
960                            | RuntimeOp::Having(_)
961                            | RuntimeOp::Process(_)
962                            | RuntimeOp::Pattern(_)
963                            | RuntimeOp::To(_)
964                    )
965                })
966        })
967    }
968
969    /// Return the partition key used by partitioned operations, if any.
970    pub fn partition_key(&self) -> Option<String> {
971        for stream in self.streams.values() {
972            if let Some(ref sase) = stream.sase_engine {
973                if let Some(key) = sase.partition_by() {
974                    return Some(key.to_string());
975                }
976            }
977            for op in &stream.operations {
978                match op {
979                    RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
980                    RuntimeOp::PartitionedSlidingCountWindow(pw) => {
981                        return Some(pw.partition_key.clone())
982                    }
983                    RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
984                    _ => {}
985                }
986            }
987            if stream.sase_engine.is_some() {
988                for op in &stream.operations {
989                    if let RuntimeOp::WhereExpr(expr) = op {
990                        if let Some(key) = compilation::extract_equality_join_key(expr) {
991                            return Some(key);
992                        }
993                    }
994                }
995            }
996        }
997        None
998    }
999
1000    /// Check if any registered stream has session windows.
1001    pub fn has_session_windows(&self) -> bool {
1002        self.streams.values().any(|s| {
1003            s.operations.iter().any(|op| {
1004                matches!(
1005                    op,
1006                    RuntimeOp::Window(WindowType::Session(_))
1007                        | RuntimeOp::Window(WindowType::PartitionedSession(_))
1008                )
1009            })
1010        })
1011    }
1012
1013    /// Return the smallest session gap across all streams (used as sweep interval).
1014    pub fn min_session_gap(&self) -> Option<chrono::Duration> {
1015        let mut min_gap: Option<chrono::Duration> = None;
1016        for stream in self.streams.values() {
1017            for op in &stream.operations {
1018                if let RuntimeOp::Window(window) = op {
1019                    let gap = match window {
1020                        WindowType::Session(w) => Some(w.gap()),
1021                        WindowType::PartitionedSession(w) => Some(w.gap()),
1022                        _ => None,
1023                    };
1024                    if let Some(g) = gap {
1025                        min_gap = Some(match min_gap {
1026                            Some(current) if g < current => g,
1027                            Some(current) => current,
1028                            None => g,
1029                        });
1030                    }
1031                }
1032            }
1033        }
1034        min_gap
1035    }
1036
1037    /// Get metrics
1038    pub fn metrics(&self) -> EngineMetrics {
1039        EngineMetrics {
1040            events_processed: self.events_processed,
1041            output_events_emitted: self.output_events_emitted,
1042            streams_count: self.streams.len(),
1043        }
1044    }
1045
1046    /// Get the names of all loaded streams.
1047    pub fn stream_names(&self) -> Vec<&str> {
1048        self.streams.keys().map(|s| s.as_str()).collect()
1049    }
1050
1051    /// Get the physical plan summary (available after `load()`).
1052    pub fn physical_plan_summary(&self) -> Option<String> {
1053        self.physical_plan.as_ref().map(|p| p.summary())
1054    }
1055
1056    pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
1057        let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
1058        let optimized = varpulis_parser::optimize_plan(logical);
1059        Ok(optimized.explain())
1060    }
1061
1062    /// Build a topology snapshot of the currently loaded streams and routes.
1063    pub fn topology(&self) -> topology::Topology {
1064        let mut builder = topology_builder::TopologyBuilder::new();
1065        for (name, stream_def) in &self.streams {
1066            builder = builder.add_stream(name, stream_def);
1067        }
1068        builder = builder.add_routes(self.router.all_routes());
1069        builder.build()
1070    }
1071
1072    /// Get a user-defined function by name
1073    pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1074        self.functions.get(name)
1075    }
1076
1077    /// Get all registered function names
1078    pub fn function_names(&self) -> Vec<&str> {
1079        self.functions.keys().map(|s| s.as_str()).collect()
1080    }
1081
1082    /// Get all timer configurations for spawning timer tasks
1083    pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1084        let mut timers = Vec::new();
1085        for stream in self.streams.values() {
1086            if let RuntimeSource::Timer(config) = &stream.source {
1087                timers.push((
1088                    config.interval_ns,
1089                    config.initial_delay_ns,
1090                    config.timer_event_type.clone(),
1091                ));
1092            }
1093        }
1094        timers
1095    }
1096
1097    /// Register a native scalar UDF.
1098    pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1099        self.udf_registry.register_scalar(udf);
1100    }
1101
1102    /// Register a native aggregate UDF.
1103    pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1104        self.udf_registry.register_aggregate(udf);
1105    }
1106
1107    /// Get a reference to the UDF registry.
1108    pub const fn udf_registry(&self) -> &UdfRegistry {
1109        &self.udf_registry
1110    }
1111
1112    // =========================================================================
1113    // Hot Reload
1114    // =========================================================================
1115
1116    /// Reload program without losing state where possible.
1117    pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1118        let mut report = ReloadReport::default();
1119
1120        let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1121
1122        let mut new_engine = Self::new_internal(self.clone_output_channel());
1123        new_engine.credentials_store = self.credentials_store.clone();
1124        new_engine.load(program)?;
1125
1126        let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1127
1128        for name in new_streams.difference(&old_streams) {
1129            report.streams_added.push(name.clone());
1130        }
1131
1132        for name in old_streams.difference(&new_streams) {
1133            report.streams_removed.push(name.clone());
1134        }
1135
1136        for name in old_streams.intersection(&new_streams) {
1137            let old_stream = self.streams.get(name).unwrap();
1138            let new_stream = new_engine.streams.get(name).unwrap();
1139
1140            let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1141            let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1142
1143            if source_changed || ops_changed {
1144                report.streams_updated.push(name.clone());
1145                report.state_reset.push(name.clone());
1146            } else {
1147                report.state_preserved.push(name.clone());
1148            }
1149        }
1150
1151        for name in &report.streams_removed {
1152            self.streams.remove(name);
1153        }
1154
1155        self.router.clear();
1156
1157        for name in &report.streams_added {
1158            if let Some(stream) = new_engine.streams.remove(name) {
1159                self.streams.insert(name.clone(), stream);
1160            }
1161        }
1162
1163        for name in &report.streams_updated {
1164            if let Some(stream) = new_engine.streams.remove(name) {
1165                self.streams.insert(name.clone(), stream);
1166            }
1167        }
1168
1169        let registrations: Vec<(String, String)> = self
1170            .streams
1171            .iter()
1172            .flat_map(|(name, stream)| {
1173                let mut pairs = Vec::new();
1174                match &stream.source {
1175                    RuntimeSource::EventType(et) => {
1176                        pairs.push((et.clone(), name.clone()));
1177                    }
1178                    RuntimeSource::Stream(s) => {
1179                        pairs.push((s.clone(), name.clone()));
1180                    }
1181                    RuntimeSource::Merge(sources) => {
1182                        for ms in sources {
1183                            pairs.push((ms.event_type.clone(), name.clone()));
1184                        }
1185                    }
1186                    RuntimeSource::Join(_) => {}
1187                    RuntimeSource::Timer(config) => {
1188                        pairs.push((config.timer_event_type.clone(), name.clone()));
1189                    }
1190                }
1191                pairs
1192            })
1193            .collect();
1194
1195        for (event_type, stream_name) in registrations {
1196            self.router.add_route(&event_type, &stream_name);
1197        }
1198
1199        self.functions = new_engine.functions;
1200        self.patterns = new_engine.patterns;
1201        self.configs = new_engine.configs;
1202        self.context_map = new_engine.context_map;
1203        self.connectors = new_engine.connectors;
1204        self.source_bindings = new_engine.source_bindings;
1205        *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1206
1207        for (name, value) in new_engine.variables {
1208            if !self.variables.contains_key(&name) {
1209                self.variables.insert(name.clone(), value);
1210                self.mutable_vars
1211                    .extend(new_engine.mutable_vars.iter().cloned());
1212            }
1213        }
1214
1215        info!(
1216            "Hot reload complete: +{} -{} ~{} streams",
1217            report.streams_added.len(),
1218            report.streams_removed.len(),
1219            report.streams_updated.len()
1220        );
1221
1222        Ok(report)
1223    }
1224
1225    // =========================================================================
1226    // Checkpointing
1227    // =========================================================================
1228
1229    /// Create a checkpoint of the engine state (windows, SASE engines, joins, variables).
1230    pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1231        use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1232
1233        let mut window_states = std::collections::HashMap::new();
1234        let mut sase_states = std::collections::HashMap::new();
1235        let mut join_states = std::collections::HashMap::new();
1236        let mut distinct_states = std::collections::HashMap::new();
1237        let mut limit_states = std::collections::HashMap::new();
1238
1239        for (name, stream) in &self.streams {
1240            for op in &stream.operations {
1241                match op {
1242                    RuntimeOp::Window(wt) => {
1243                        let cp = match wt {
1244                            WindowType::Tumbling(w) => w.checkpoint(),
1245                            WindowType::Sliding(w) => w.checkpoint(),
1246                            WindowType::Count(w) => w.checkpoint(),
1247                            WindowType::SlidingCount(w) => w.checkpoint(),
1248                            WindowType::Session(w) => w.checkpoint(),
1249                            WindowType::PartitionedSession(w) => w.checkpoint(),
1250                            WindowType::PartitionedTumbling(w) => w.checkpoint(),
1251                            WindowType::PartitionedSliding(w) => w.checkpoint(),
1252                        };
1253                        window_states.insert(name.clone(), cp);
1254                    }
1255                    RuntimeOp::PartitionedWindow(pw) => {
1256                        let mut partitions = std::collections::HashMap::new();
1257                        for (key, cw) in &pw.windows {
1258                            let sub_cp = cw.checkpoint();
1259                            partitions.insert(
1260                                key.clone(),
1261                                crate::persistence::PartitionedWindowCheckpoint {
1262                                    events: sub_cp.events,
1263                                    window_start_ms: sub_cp.window_start_ms,
1264                                },
1265                            );
1266                        }
1267                        window_states.insert(
1268                            name.clone(),
1269                            WindowCheckpoint {
1270                                events: Vec::new(),
1271                                window_start_ms: None,
1272                                last_emit_ms: None,
1273                                partitions,
1274                            },
1275                        );
1276                    }
1277                    RuntimeOp::Distinct(state) => {
1278                        let keys: Vec<String> =
1279                            state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1280                        distinct_states.insert(
1281                            name.clone(),
1282                            crate::persistence::DistinctCheckpoint { keys },
1283                        );
1284                    }
1285                    RuntimeOp::Limit(state) => {
1286                        limit_states.insert(
1287                            name.clone(),
1288                            crate::persistence::LimitCheckpoint {
1289                                max: state.max,
1290                                count: state.count,
1291                            },
1292                        );
1293                    }
1294                    _ => {}
1295                }
1296            }
1297
1298            if let Some(ref sase) = stream.sase_engine {
1299                sase_states.insert(name.clone(), sase.checkpoint());
1300            }
1301
1302            if let Some(ref jb) = stream.join_buffer {
1303                join_states.insert(name.clone(), jb.checkpoint());
1304            }
1305        }
1306
1307        let variables = self
1308            .variables
1309            .iter()
1310            .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1311            .collect();
1312
1313        let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1314
1315        EngineCheckpoint {
1316            version: crate::persistence::CHECKPOINT_VERSION,
1317            window_states,
1318            sase_states,
1319            join_states,
1320            variables,
1321            events_processed: self.events_processed,
1322            output_events_emitted: self.output_events_emitted,
1323            watermark_state,
1324            distinct_states,
1325            limit_states,
1326        }
1327    }
1328
1329    /// Restore engine state from a checkpoint.
1330    pub fn restore_checkpoint(
1331        &mut self,
1332        cp: &crate::persistence::EngineCheckpoint,
1333    ) -> Result<(), crate::persistence::StoreError> {
1334        let mut migrated = cp.clone();
1335        migrated.validate_and_migrate()?;
1336
1337        self.events_processed = cp.events_processed;
1338        self.output_events_emitted = cp.output_events_emitted;
1339
1340        for (k, sv) in &cp.variables {
1341            self.variables
1342                .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1343        }
1344
1345        for (name, stream) in &mut self.streams {
1346            if let Some(wcp) = cp.window_states.get(name) {
1347                for op in &mut stream.operations {
1348                    match op {
1349                        RuntimeOp::Window(wt) => match wt {
1350                            WindowType::Tumbling(w) => w.restore(wcp),
1351                            WindowType::Sliding(w) => w.restore(wcp),
1352                            WindowType::Count(w) => w.restore(wcp),
1353                            WindowType::SlidingCount(w) => w.restore(wcp),
1354                            WindowType::Session(w) => w.restore(wcp),
1355                            WindowType::PartitionedSession(w) => w.restore(wcp),
1356                            WindowType::PartitionedTumbling(w) => w.restore(wcp),
1357                            WindowType::PartitionedSliding(w) => w.restore(wcp),
1358                        },
1359                        RuntimeOp::PartitionedWindow(pw) => {
1360                            for (key, pcp) in &wcp.partitions {
1361                                let sub_wcp = crate::persistence::WindowCheckpoint {
1362                                    events: pcp.events.clone(),
1363                                    window_start_ms: pcp.window_start_ms,
1364                                    last_emit_ms: None,
1365                                    partitions: std::collections::HashMap::new(),
1366                                };
1367                                let window = pw
1368                                    .windows
1369                                    .entry(key.clone())
1370                                    .or_insert_with(|| CountWindow::new(pw.window_size));
1371                                window.restore(&sub_wcp);
1372                            }
1373                        }
1374                        _ => {}
1375                    }
1376                }
1377            }
1378
1379            if let Some(scp) = cp.sase_states.get(name) {
1380                if let Some(ref mut sase) = stream.sase_engine {
1381                    sase.restore(scp);
1382                }
1383            }
1384
1385            if let Some(jcp) = cp.join_states.get(name) {
1386                if let Some(ref mut jb) = stream.join_buffer {
1387                    jb.restore(jcp);
1388                }
1389            }
1390
1391            if let Some(dcp) = cp.distinct_states.get(name) {
1392                for op in &mut stream.operations {
1393                    if let RuntimeOp::Distinct(state) = op {
1394                        state.seen.clear();
1395                        for key in dcp.keys.iter().rev() {
1396                            state.seen.insert(key.clone(), ());
1397                        }
1398                    }
1399                }
1400            }
1401
1402            if let Some(lcp) = cp.limit_states.get(name) {
1403                for op in &mut stream.operations {
1404                    if let RuntimeOp::Limit(state) = op {
1405                        state.max = lcp.max;
1406                        state.count = lcp.count;
1407                    }
1408                }
1409            }
1410        }
1411
1412        if let Some(ref wcp) = cp.watermark_state {
1413            if self.watermark_tracker.is_none() {
1414                self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1415            }
1416            if let Some(ref mut tracker) = self.watermark_tracker {
1417                tracker.restore(wcp);
1418                self.last_applied_watermark = wcp
1419                    .effective_watermark_ms
1420                    .and_then(DateTime::from_timestamp_millis);
1421            }
1422        }
1423
1424        info!(
1425            "Engine restored: {} events processed, {} streams with state (schema v{})",
1426            cp.events_processed,
1427            cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1428            cp.version
1429        );
1430
1431        Ok(())
1432    }
1433
1434    /// Enable auto-checkpointing with the given store and config.
1435    pub fn enable_checkpointing(
1436        &mut self,
1437        store: std::sync::Arc<dyn crate::persistence::StateStore>,
1438        config: crate::persistence::CheckpointConfig,
1439    ) -> Result<(), crate::persistence::StoreError> {
1440        let manager = crate::persistence::CheckpointManager::new(store, config)?;
1441
1442        if let Some(cp) = manager.recover()? {
1443            if let Some(engine_cp) = cp.context_states.get("main") {
1444                self.restore_checkpoint(engine_cp)?;
1445                info!(
1446                    "Auto-restored from checkpoint {} ({} events)",
1447                    cp.id, cp.events_processed
1448                );
1449            }
1450        }
1451
1452        self.checkpoint_manager = Some(manager);
1453        Ok(())
1454    }
1455
1456    /// Check if a checkpoint is due and create one if needed.
1457    pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1458        let should = self
1459            .checkpoint_manager
1460            .as_ref()
1461            .is_some_and(|m| m.should_checkpoint());
1462
1463        if should {
1464            self.force_checkpoint()?;
1465        }
1466        Ok(())
1467    }
1468
1469    /// Force an immediate checkpoint regardless of the interval.
1470    pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1471        if self.checkpoint_manager.is_none() {
1472            return Ok(());
1473        }
1474
1475        let engine_cp = self.create_checkpoint();
1476        let events_processed = self.events_processed;
1477
1478        let mut context_states = std::collections::HashMap::new();
1479        context_states.insert("main".to_string(), engine_cp);
1480
1481        let cp = crate::persistence::Checkpoint {
1482            id: 0,
1483            timestamp_ms: 0,
1484            events_processed,
1485            window_states: std::collections::HashMap::new(),
1486            pattern_states: std::collections::HashMap::new(),
1487            metadata: std::collections::HashMap::new(),
1488            context_states,
1489        };
1490
1491        self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1492        Ok(())
1493    }
1494
1495    /// Returns true if auto-checkpointing is enabled.
1496    pub const fn has_checkpointing(&self) -> bool {
1497        self.checkpoint_manager.is_some()
1498    }
1499
1500    // =========================================================================
1501    // Watermark Management
1502    // =========================================================================
1503
1504    /// Enable per-source watermark tracking for this engine.
1505    pub fn enable_watermark_tracking(&mut self) {
1506        if self.watermark_tracker.is_none() {
1507            self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1508        }
1509    }
1510
1511    /// Register a source for watermark tracking with its max out-of-orderness.
1512    pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1513        if let Some(ref mut tracker) = self.watermark_tracker {
1514            tracker.register_source(source, max_ooo);
1515        }
1516    }
1517
1518    /// Advance the watermark from an external source (e.g., upstream context).
1519    #[tracing::instrument(skip(self))]
1520    pub async fn advance_external_watermark(
1521        &mut self,
1522        source_context: &str,
1523        watermark_ms: i64,
1524    ) -> Result<(), error::EngineError> {
1525        if let Some(ref mut tracker) = self.watermark_tracker {
1526            if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1527                tracker.advance_source_watermark(source_context, wm);
1528
1529                if let Some(new_wm) = tracker.effective_watermark() {
1530                    if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1531                        self.apply_watermark_to_windows(new_wm).await?;
1532                        self.last_applied_watermark = Some(new_wm);
1533                    }
1534                }
1535            }
1536        }
1537        Ok(())
1538    }
1539
1540    /// Check if two runtime sources are compatible for state preservation
1541    fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1542        match (a, b) {
1543            (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1544            (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1545            (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1546                a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1547            }
1548            (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1549                a.len() == b.len()
1550                    && a.iter()
1551                        .zip(b.iter())
1552                        .all(|(x, y)| x.event_type == y.event_type)
1553            }
1554            (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1555            _ => false,
1556        }
1557    }
1558}