1#[cfg(feature = "async-runtime")]
7mod builder;
8mod compilation;
9pub mod compiler;
10mod dispatch;
11pub mod error;
12pub mod evaluator;
13pub mod graph;
14mod pattern_analyzer;
15pub mod physical_plan;
16mod pipeline;
17pub mod planner;
18mod router;
19#[cfg(feature = "async-runtime")]
20mod sink_factory;
21pub mod topology;
22pub mod topology_builder;
23pub mod trace;
24mod types;
25
26use std::sync::Arc;
28
29#[cfg(feature = "async-runtime")]
30pub use builder::EngineBuilder;
31use chrono::{DateTime, Duration, Utc};
32pub use evaluator::eval_filter_expr;
34use rustc_hash::{FxHashMap, FxHashSet};
35#[cfg(feature = "async-runtime")]
36pub use sink_factory::SinkConnectorAdapter;
37#[cfg(feature = "async-runtime")]
38use tokio::sync::mpsc;
39use tracing::{info, warn};
40pub use types::NamedPattern;
42pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
43use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
45use varpulis_core::ast::{ConfigItem, Program, Stmt};
46use varpulis_core::Value;
47
48#[cfg(feature = "async-runtime")]
49use crate::connector;
50#[cfg(feature = "async-runtime")]
51use crate::context::ContextMap;
52use crate::event::{Event, SharedEvent};
53#[cfg(feature = "async-runtime")]
54use crate::metrics::Metrics;
55use crate::sase_persistence::SaseCheckpointExt;
56use crate::sequence::SequenceContext;
57use crate::udf::UdfRegistry;
58use crate::watermark::PerSourceWatermarkTracker;
59use crate::window::CountWindow;
60
61#[cfg(feature = "async-runtime")]
64#[derive(Debug)]
65pub(super) enum OutputChannel {
66 Owned(mpsc::Sender<Event>),
68 Shared(mpsc::Sender<SharedEvent>),
70}
71
72pub struct Engine {
74 pub(super) streams: FxHashMap<String, StreamDefinition>,
76 pub(super) router: router::EventRouter,
78 pub(super) functions: FxHashMap<String, UserFunction>,
80 pub(super) patterns: FxHashMap<String, NamedPattern>,
82 pub(super) configs: FxHashMap<String, EngineConfig>,
84 pub(super) variables: FxHashMap<String, Value>,
86 pub(super) mutable_vars: FxHashSet<String>,
88 #[cfg(feature = "async-runtime")]
90 pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
91 pub(super) source_bindings: Vec<SourceBinding>,
93 #[cfg(feature = "async-runtime")]
95 pub(super) sinks: sink_factory::SinkRegistry,
96 #[cfg(feature = "async-runtime")]
99 pub(super) output_channel: Option<OutputChannel>,
100 pub(super) collected_outputs: Vec<Event>,
102 pub(super) events_processed: u64,
104 pub(super) output_events_emitted: u64,
105 #[cfg(feature = "async-runtime")]
107 pub(super) metrics: Option<Metrics>,
108 #[cfg(feature = "async-runtime")]
110 pub(super) context_map: ContextMap,
111 pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
113 pub(super) last_applied_watermark: Option<DateTime<Utc>>,
115 pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
117 pub(super) context_name: Option<String>,
119 pub(super) topic_prefix: Option<String>,
121 pub(super) shared_hamlet_aggregators:
123 Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
124 pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
126 #[cfg(feature = "async-runtime")]
128 pub(super) credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
129 pub(super) dlq_path: Option<std::path::PathBuf>,
131 pub(super) dlq_config: crate::dead_letter::DlqConfig,
133 pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
135 pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
137 pub(super) udf_registry: UdfRegistry,
139 pub(super) trace_collector: trace::TraceCollector,
141}
142
143impl std::fmt::Debug for Engine {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 let mut s = f.debug_struct("Engine");
146 s.field("streams", &self.streams.keys().collect::<Vec<_>>())
147 .field("functions", &self.functions.keys().collect::<Vec<_>>())
148 .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
149 .field("configs", &self.configs.keys().collect::<Vec<_>>());
150 #[cfg(feature = "async-runtime")]
151 s.field("connectors", &self.connectors.keys().collect::<Vec<_>>())
152 .field("context_map", &self.context_map);
153 s.field("events_processed", &self.events_processed)
154 .field("output_events_emitted", &self.output_events_emitted)
155 .field("context_name", &self.context_name)
156 .field("topic_prefix", &self.topic_prefix)
157 .finish_non_exhaustive()
158 }
159}
160
161impl Engine {
162 #[cfg(feature = "async-runtime")]
164 pub fn builder() -> EngineBuilder {
165 EngineBuilder::new()
166 }
167
168 #[cfg(feature = "async-runtime")]
170 pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
171 Self::new_internal(Some(OutputChannel::Owned(output_tx)))
172 }
173
174 pub fn new_benchmark() -> Self {
177 #[cfg(feature = "async-runtime")]
178 {
179 Self::new_internal(None)
180 }
181 #[cfg(not(feature = "async-runtime"))]
182 {
183 Self::new_sync()
184 }
185 }
186
187 #[cfg(feature = "async-runtime")]
189 pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
190 Self::new_internal(output_tx.map(OutputChannel::Owned))
191 }
192
193 #[cfg(feature = "async-runtime")]
195 pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
196 Self::new_internal(Some(OutputChannel::Shared(output_tx)))
197 }
198
199 #[cfg(feature = "async-runtime")]
201 pub fn set_credentials_store(&mut self, store: Arc<connector::credentials::CredentialsStore>) {
202 self.credentials_store = Some(store);
203 }
204
205 #[cfg(feature = "async-runtime")]
207 fn new_internal(output_channel: Option<OutputChannel>) -> Self {
208 Self {
209 streams: FxHashMap::default(),
210 router: router::EventRouter::new(),
211 functions: FxHashMap::default(),
212 patterns: FxHashMap::default(),
213 configs: FxHashMap::default(),
214 variables: FxHashMap::default(),
215 mutable_vars: FxHashSet::default(),
216 connectors: FxHashMap::default(),
217 source_bindings: Vec::new(),
218 sinks: sink_factory::SinkRegistry::new(),
219 output_channel,
220 events_processed: 0,
221 output_events_emitted: 0,
222 metrics: None,
223 context_map: ContextMap::new(),
224 watermark_tracker: None,
225 last_applied_watermark: None,
226 late_data_configs: FxHashMap::default(),
227 context_name: None,
228 topic_prefix: None,
229 shared_hamlet_aggregators: Vec::new(),
230 checkpoint_manager: None,
231 credentials_store: None,
232 dlq_path: None,
233 dlq_config: crate::dead_letter::DlqConfig::default(),
234 dlq: None,
235 physical_plan: None,
236 udf_registry: UdfRegistry::new(),
237 trace_collector: trace::TraceCollector::new(),
238 collected_outputs: Vec::new(),
239 }
240 }
241
242 pub fn new_sync() -> Self {
245 #[cfg(feature = "async-runtime")]
246 {
247 let mut engine = Self::new_internal(None);
248 engine.collected_outputs = Vec::new();
249 engine
250 }
251 #[cfg(not(feature = "async-runtime"))]
252 {
253 Self {
254 streams: FxHashMap::default(),
255 router: router::EventRouter::new(),
256 functions: FxHashMap::default(),
257 patterns: FxHashMap::default(),
258 configs: FxHashMap::default(),
259 variables: FxHashMap::default(),
260 mutable_vars: FxHashSet::default(),
261 source_bindings: Vec::new(),
262 collected_outputs: Vec::new(),
263 events_processed: 0,
264 output_events_emitted: 0,
265 watermark_tracker: None,
266 last_applied_watermark: None,
267 late_data_configs: FxHashMap::default(),
268 context_name: None,
269 topic_prefix: None,
270 shared_hamlet_aggregators: Vec::new(),
271 checkpoint_manager: None,
272 dlq_path: None,
273 dlq_config: crate::dead_letter::DlqConfig::default(),
274 dlq: None,
275 physical_plan: None,
276 udf_registry: UdfRegistry::new(),
277 trace_collector: trace::TraceCollector::new(),
278 }
279 }
280 }
281
282 #[cfg(feature = "async-runtime")]
284 fn clone_output_channel(&self) -> Option<OutputChannel> {
285 match &self.output_channel {
286 Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
287 Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
288 None => None,
289 }
290 }
291
292 #[cfg(feature = "async-runtime")]
296 #[inline]
297 pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
298 match &self.output_channel {
299 Some(OutputChannel::Shared(tx)) => {
300 if let Err(e) = tx.try_send(Arc::clone(event)) {
302 warn!("Failed to send output event: {}", e);
303 }
304 }
305 Some(OutputChannel::Owned(tx)) => {
306 let owned = (**event).clone();
308 if let Err(e) = tx.try_send(owned) {
309 warn!("Failed to send output event: {}", e);
310 }
311 }
312 None => {
313 self.collected_outputs.push((**event).clone());
315 }
316 }
317 }
318
319 #[cfg(not(feature = "async-runtime"))]
321 #[inline]
322 pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
323 self.collected_outputs.push((**event).clone());
324 }
325
326 #[cfg(feature = "async-runtime")]
329 #[inline]
330 pub(super) fn send_output(&mut self, event: Event) {
331 match &self.output_channel {
332 Some(OutputChannel::Shared(tx)) => {
333 if let Err(e) = tx.try_send(Arc::new(event)) {
335 warn!("Failed to send output event: {}", e);
336 }
337 }
338 Some(OutputChannel::Owned(tx)) => {
339 if let Err(e) = tx.try_send(event) {
340 warn!("Failed to send output event: {}", e);
341 }
342 }
343 None => {
344 self.collected_outputs.push(event);
346 }
347 }
348 }
349
350 #[cfg(not(feature = "async-runtime"))]
352 #[inline]
353 #[allow(dead_code)]
354 pub(super) fn send_output(&mut self, event: Event) {
355 self.collected_outputs.push(event);
356 }
357
358 pub fn process_batch_sync_collect(
361 &mut self,
362 events: Vec<Event>,
363 ) -> Result<Vec<Event>, error::EngineError> {
364 self.collected_outputs.clear();
365 self.process_batch_sync(events)?;
366 Ok(std::mem::take(&mut self.collected_outputs))
367 }
368
369 pub fn set_context_name(&mut self, name: &str) {
371 self.context_name = Some(name.to_string());
372 }
373
374 pub fn set_topic_prefix(&mut self, prefix: &str) {
379 self.topic_prefix = Some(prefix.to_string());
380 }
381
382 pub fn set_trace_enabled(&mut self, enabled: bool) {
388 self.trace_collector.set_enabled(enabled);
389 }
390
391 #[inline]
393 pub fn is_trace_enabled(&self) -> bool {
394 self.trace_collector.is_enabled()
395 }
396
397 pub fn drain_trace(&mut self) -> Vec<trace::TraceEntry> {
399 self.trace_collector.drain()
400 }
401
402 pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
404 self.dlq_path = Some(path);
405 }
406
407 pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
409 self.dlq_config = config;
410 }
411
412 pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
414 self.dlq.as_ref()
415 }
416
417 pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
419 self.patterns.get(name)
420 }
421
422 pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
424 &self.patterns
425 }
426
427 pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
429 self.configs.get(name)
430 }
431
432 #[cfg(feature = "async-runtime")]
434 pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
435 self.connectors.get(name)
436 }
437
438 #[cfg(feature = "async-runtime")]
440 pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
441 &self.connectors
442 }
443
444 pub fn source_bindings(&self) -> &[SourceBinding] {
446 &self.source_bindings
447 }
448
449 pub fn get_variable(&self, name: &str) -> Option<&Value> {
451 self.variables.get(name)
452 }
453
454 pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
456 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
457 return Err(error::EngineError::Compilation(format!(
458 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
459 )));
460 }
461 self.variables.insert(name.to_string(), value);
462 Ok(())
463 }
464
465 pub const fn variables(&self) -> &FxHashMap<String, Value> {
467 &self.variables
468 }
469
470 #[cfg(feature = "async-runtime")]
472 pub const fn context_map(&self) -> &ContextMap {
473 &self.context_map
474 }
475
476 #[cfg(feature = "async-runtime")]
478 pub fn has_contexts(&self) -> bool {
479 self.context_map.has_contexts()
480 }
481
482 #[cfg(feature = "async-runtime")]
484 pub fn with_metrics(mut self, metrics: Metrics) -> Self {
485 self.metrics = Some(metrics);
486 self
487 }
488
489 pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
491 where
492 F: Fn(&Event) -> bool + Send + Sync + 'static,
493 {
494 if let Some(stream) = self.streams.get_mut(stream_name) {
495 let wrapped = move |e: &SharedEvent| filter(e.as_ref());
496 stream
497 .operations
498 .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
499 Ok(())
500 } else {
501 Err(error::EngineError::StreamNotFound(stream_name.to_string()))
502 }
503 }
504
505 pub fn load_with_source(
507 &mut self,
508 source: &str,
509 program: &Program,
510 ) -> Result<(), error::EngineError> {
511 let validation = varpulis_core::validate::validate(source, program);
512 if validation.has_errors() {
513 return Err(error::EngineError::Compilation(validation.format(source)));
514 }
515 for warning in validation
516 .diagnostics
517 .iter()
518 .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
519 {
520 warn!("{}", warning.message);
521 }
522 self.load_program(program)
523 }
524
525 pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
527 self.load_program(program)
528 }
529
530 fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
531 for stmt in &program.statements {
532 match &stmt.node {
533 Stmt::StreamDecl {
534 name, source, ops, ..
535 } => {
536 self.register_stream(name, source, ops)?;
537 }
538 Stmt::EventDecl { name, fields, .. } => {
539 info!(
540 "Registered event type: {} with {} fields",
541 name,
542 fields.len()
543 );
544 }
545 Stmt::FnDecl {
546 name,
547 params,
548 ret,
549 body,
550 } => {
551 let user_fn = UserFunction {
552 name: name.clone(),
553 params: params
554 .iter()
555 .map(|p| (p.name.clone(), p.ty.clone()))
556 .collect(),
557 return_type: ret.clone(),
558 body: body.clone(),
559 };
560 info!(
561 "Registered function: {}({} params)",
562 name,
563 user_fn.params.len()
564 );
565 self.functions.insert(name.clone(), user_fn);
566 }
567 Stmt::Config { name, items } => {
568 warn!(
569 "DEPRECATED: 'config {}' block syntax is deprecated. \
570 Use 'connector' declarations instead: \
571 connector MyConn = {} (...)",
572 name, name
573 );
574 let mut values = std::collections::HashMap::new();
575 for item in items {
576 if let ConfigItem::Value(key, val) = item {
577 values.insert(key.clone(), val.clone());
578 }
579 }
580 info!(
581 "Registered config block: {} with {} items",
582 name,
583 values.len()
584 );
585 self.configs.insert(
586 name.clone(),
587 EngineConfig {
588 name: name.clone(),
589 values,
590 },
591 );
592 }
593 Stmt::PatternDecl {
594 name,
595 expr,
596 within,
597 partition_by,
598 } => {
599 let named_pattern = NamedPattern {
600 name: name.clone(),
601 expr: expr.clone(),
602 within: within.clone(),
603 partition_by: partition_by.clone(),
604 };
605 info!(
606 "Registered SASE+ pattern: {} (within: {}, partition: {})",
607 name,
608 within.is_some(),
609 partition_by.is_some()
610 );
611 self.patterns.insert(name.clone(), named_pattern);
612 }
613 Stmt::Import { path, alias } => {
614 warn!(
615 "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
616 path, alias
617 );
618 }
619 Stmt::VarDecl {
620 mutable,
621 name,
622 value,
623 ..
624 } => {
625 let dummy_event = Event::new("__init__");
626 let empty_ctx = SequenceContext::new();
627 let initial_value = evaluator::eval_expr_with_functions(
628 value,
629 &dummy_event,
630 &empty_ctx,
631 &self.functions,
632 &self.variables,
633 )
634 .ok_or_else(|| {
635 error::EngineError::Compilation(format!(
636 "Failed to evaluate initial value for variable '{name}'"
637 ))
638 })?;
639
640 info!(
641 "Registered {} variable: {} = {:?}",
642 if *mutable { "mutable" } else { "immutable" },
643 name,
644 initial_value
645 );
646
647 self.variables.insert(name.clone(), initial_value);
648 if *mutable {
649 self.mutable_vars.insert(name.clone());
650 }
651 }
652 Stmt::Assignment { name, value } => {
653 let dummy_event = Event::new("__assign__");
654 let empty_ctx = SequenceContext::new();
655 let new_value = evaluator::eval_expr_with_functions(
656 value,
657 &dummy_event,
658 &empty_ctx,
659 &self.functions,
660 &self.variables,
661 )
662 .ok_or_else(|| {
663 error::EngineError::Compilation(format!(
664 "Failed to evaluate assignment value for '{name}'"
665 ))
666 })?;
667
668 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
669 return Err(error::EngineError::Compilation(format!(
670 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
671 )));
672 }
673
674 if !self.variables.contains_key(name) {
675 self.mutable_vars.insert(name.clone());
676 }
677
678 info!("Assigned variable: {} = {:?}", name, new_value);
679 self.variables.insert(name.clone(), new_value);
680 }
681 #[cfg(feature = "async-runtime")]
682 Stmt::ContextDecl { name, cores } => {
683 use crate::context::ContextConfig;
684 info!("Registered context: {} (cores: {:?})", name, cores);
685 self.context_map.register_context(ContextConfig {
686 name: name.clone(),
687 cores: cores.clone(),
688 });
689 }
690 #[cfg(not(feature = "async-runtime"))]
691 Stmt::ContextDecl { .. } => {
692 warn!("Context declarations require async-runtime feature; ignoring");
693 }
694 #[cfg(feature = "async-runtime")]
695 Stmt::ConnectorDecl {
696 name,
697 connector_type,
698 params,
699 } => {
700 let mut config =
701 sink_factory::connector_params_to_config(connector_type, params);
702
703 if let Some(profile_name) = config.properties.swap_remove("profile") {
705 if let Some(ref store) = self.credentials_store {
706 store
707 .merge_profile(&profile_name, &mut config)
708 .map_err(|e| {
709 error::EngineError::Compilation(format!(
710 "Failed to resolve credentials profile '{}' for connector '{}': {}",
711 profile_name, name, e
712 ))
713 })?;
714 info!(
715 "Registered connector: {} (type: {}, profile: {})",
716 name, connector_type, profile_name
717 );
718 } else {
719 return Err(error::EngineError::Compilation(format!(
720 "Connector '{}' references profile '{}' but no credentials file was provided. \
721 Use --credentials or set VARPULIS_CREDENTIALS.",
722 name, profile_name
723 )));
724 }
725 } else {
726 info!("Registered connector: {} (type: {})", name, connector_type);
727 }
728
729 self.connectors.insert(name.clone(), config);
730 }
731 #[cfg(not(feature = "async-runtime"))]
732 Stmt::ConnectorDecl { name, .. } => {
733 warn!(
734 "Connector '{}' declarations require async-runtime feature; ignoring",
735 name
736 );
737 }
738 _ => {
739 tracing::debug!("Skipping statement: {:?}", stmt.node);
740 }
741 }
742 }
743
744 #[cfg(feature = "async-runtime")]
746 {
747 let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
749 let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
750 for stream in self.streams.values() {
751 for op in &stream.operations {
752 if let RuntimeOp::To(to_config) = op {
753 referenced_sink_keys.insert(to_config.sink_key.clone());
754 match &to_config.topic {
755 Some(types::TopicSpec::Static(topic)) => {
756 topic_overrides.push((
757 to_config.sink_key.clone(),
758 to_config.connector_name.clone(),
759 topic.clone(),
760 ));
761 }
762 Some(types::TopicSpec::Dynamic(_)) => {
763 referenced_sink_keys.insert(to_config.connector_name.clone());
765 }
766 None => {}
767 }
768 }
769 }
770 }
771
772 self.sinks.build_from_connectors(
774 &self.connectors,
775 &referenced_sink_keys,
776 &topic_overrides,
777 self.context_name.as_deref(),
778 self.topic_prefix.as_deref(),
779 );
780
781 if !self.sinks.cache().is_empty() {
783 let dlq_path = self
784 .dlq_path
785 .clone()
786 .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
787 let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
788 &dlq_path,
789 self.dlq_config.clone(),
790 )
791 .map(Arc::new)
792 .ok();
793 if dlq.is_some() {
794 tracing::info!(
795 "Dead letter queue enabled at {} for {} sink(s)",
796 dlq_path.display(),
797 self.sinks.cache().len()
798 );
799 }
800 self.dlq = dlq.clone();
801 self.sinks.wrap_with_resilience(
802 crate::circuit_breaker::CircuitBreakerConfig::default(),
803 dlq,
804 self.metrics.clone(),
805 );
806 }
807 }
808
809 self.setup_hamlet_sharing();
811
812 let mut plan = physical_plan::PhysicalPlan::new();
814 let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
815 for (event_type, targets) in self.router.all_routes() {
816 for target in targets.iter() {
817 stream_event_types
818 .entry(target.clone())
819 .or_default()
820 .push(event_type.clone());
821 }
822 }
823 for (name, stream_def) in &self.streams {
824 let op_summary = stream_def
825 .operations
826 .iter()
827 .map(|op| op.summary_name())
828 .collect::<Vec<_>>()
829 .join(" → ");
830 plan.add_stream(physical_plan::PhysicalStream {
831 name: name.clone(),
832 operation_count: stream_def.operations.len(),
833 operation_summary: op_summary,
834 logical_id: plan.stream_count() as u32,
835 registered_event_types: stream_event_types
836 .remove(name.as_str())
837 .unwrap_or_default(),
838 });
839 }
840 self.physical_plan = Some(plan);
841
842 Ok(())
843 }
844
845 fn setup_hamlet_sharing(&mut self) {
848 use crate::hamlet::template::TemplateBuilder;
849 use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
850
851 let hamlet_streams: Vec<String> = self
852 .streams
853 .iter()
854 .filter(|(_, s)| s.hamlet_aggregator.is_some())
855 .map(|(name, _)| name.clone())
856 .collect();
857
858 if hamlet_streams.len() < 2 {
859 return;
860 }
861
862 let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
863
864 for stream_name in &hamlet_streams {
865 if let Some(stream) = self.streams.get(stream_name) {
866 let mut kleene_types = Vec::new();
867 for op in &stream.operations {
868 if let RuntimeOp::TrendAggregate(_) = op {
869 if let Some(ref agg) = stream.hamlet_aggregator {
870 let tmpl = agg.merged_template();
871 for query in agg.registered_queries() {
872 for &kt in &query.kleene_types {
873 let type_name =
874 tmpl.type_name(kt).unwrap_or("unknown").to_string();
875 if !kleene_types.contains(&type_name) {
876 kleene_types.push(type_name);
877 }
878 }
879 }
880 }
881 }
882 }
883 kleene_types.sort();
884 kleene_groups
885 .entry(kleene_types)
886 .or_default()
887 .push(stream_name.clone());
888 }
889 }
890
891 for (kleene_key, group_streams) in &kleene_groups {
892 if group_streams.len() < 2 || kleene_key.is_empty() {
893 continue;
894 }
895
896 info!(
897 "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
898 group_streams.len(),
899 kleene_key,
900 group_streams,
901 );
902
903 let mut builder = TemplateBuilder::new();
904 type SharingEntry = (
905 String,
906 QueryRegistration,
907 Vec<(String, crate::greta::GretaAggregate)>,
908 );
909 let mut all_registrations: Vec<SharingEntry> = Vec::new();
910 let mut next_query_id: crate::greta::QueryId = 0;
911
912 for stream_name in group_streams {
913 if let Some(stream) = self.streams.get(stream_name) {
914 if let Some(ref agg) = stream.hamlet_aggregator {
915 let tmpl = agg.merged_template();
916 for query in agg.registered_queries() {
917 let new_id = next_query_id;
918 next_query_id += 1;
919
920 let event_names: Vec<String> = query
922 .event_types
923 .iter()
924 .map(|&idx| tmpl.type_name(idx).unwrap_or("unknown").to_string())
925 .collect();
926 let name_strs: Vec<&str> =
927 event_names.iter().map(String::as_str).collect();
928
929 let base_state = builder.num_states() as u16;
931 builder.add_sequence(new_id, &name_strs);
932
933 for &kt in &query.kleene_types {
934 let type_name = tmpl.type_name(kt).unwrap_or("unknown").to_string();
935 let position = event_names
936 .iter()
937 .position(|n| *n == type_name)
938 .unwrap_or(0);
939 let kleene_state = base_state + position as u16 + 1;
942 builder.add_kleene(new_id, &type_name, kleene_state);
943 }
944
945 let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
946 .operations
947 .iter()
948 .find_map(|op| {
949 if let RuntimeOp::TrendAggregate(config) = op {
950 Some(config.fields.clone())
951 } else {
952 None
953 }
954 })
955 .unwrap_or_default();
956
957 all_registrations.push((
958 stream_name.clone(),
959 QueryRegistration {
960 id: new_id,
961 event_types: query.event_types.clone(),
962 kleene_types: query.kleene_types.clone(),
963 aggregate: query.aggregate,
964 },
965 fields,
966 ));
967 }
968 }
969 }
970 }
971
972 let template = builder.build();
973 let config = HamletConfig {
974 window_ms: 60_000,
975 incremental: true,
976 ..Default::default()
977 };
978 let mut shared_agg = HamletAggregator::new(config, template);
979
980 for (_, registration, _) in &all_registrations {
981 shared_agg.register_query(registration.clone());
982 }
983
984 let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
985 self.shared_hamlet_aggregators.push(shared_ref.clone());
986
987 for (stream_name, registration, fields) in &all_registrations {
988 if let Some(stream) = self.streams.get_mut(stream_name) {
989 stream.hamlet_aggregator = None;
990 stream.shared_hamlet_ref = Some(shared_ref.clone());
991
992 for op in &mut stream.operations {
993 if let RuntimeOp::TrendAggregate(config) = op {
994 config.query_id = registration.id;
995 config.fields = fields.clone();
996 }
997 }
998 }
999 }
1000
1001 info!(
1002 "Created shared Hamlet aggregator with {} queries",
1003 next_query_id,
1004 );
1005 }
1006 }
1007
1008 #[cfg(feature = "async-runtime")]
1010 #[tracing::instrument(skip(self))]
1011 pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
1012 self.sinks
1013 .connect_all()
1014 .await
1015 .map_err(error::EngineError::Pipeline)
1016 }
1017
1018 #[cfg(feature = "async-runtime")]
1020 pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
1021 self.sinks.insert(key.to_string(), sink);
1022 }
1023
1024 #[cfg(feature = "async-runtime")]
1026 pub fn has_sink(&self, key: &str) -> bool {
1027 self.sinks.cache().contains_key(key)
1028 }
1029
1030 #[cfg(feature = "async-runtime")]
1032 pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
1033 let prefix = format!("{connector_name}::");
1034 self.sinks
1035 .cache()
1036 .keys()
1037 .filter(|k| *k == connector_name || k.starts_with(&prefix))
1038 .cloned()
1039 .collect()
1040 }
1041
1042 pub fn has_sink_operations(&self) -> bool {
1048 self.streams.values().any(|s| {
1049 s.operations
1050 .iter()
1051 .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
1052 })
1053 }
1054
1055 pub const fn event_counters(&self) -> (u64, u64) {
1057 (self.events_processed, self.output_events_emitted)
1058 }
1059
1060 pub fn is_stateless(&self) -> bool {
1062 self.streams.values().all(|s| {
1063 s.sase_engine.is_none()
1064 && s.join_buffer.is_none()
1065 && s.hamlet_aggregator.is_none()
1066 && s.shared_hamlet_ref.is_none()
1067 && s.operations.iter().all(|op| {
1068 matches!(
1069 op,
1070 RuntimeOp::WhereExpr(_)
1071 | RuntimeOp::WhereClosure(_)
1072 | RuntimeOp::Select(_)
1073 | RuntimeOp::Emit(_)
1074 | RuntimeOp::EmitExpr(_)
1075 | RuntimeOp::Print(_)
1076 | RuntimeOp::Log(_)
1077 | RuntimeOp::Having(_)
1078 | RuntimeOp::Process(_)
1079 | RuntimeOp::Pattern(_)
1080 | RuntimeOp::To(_)
1081 )
1082 })
1083 })
1084 }
1085
1086 pub fn partition_key(&self) -> Option<String> {
1088 for stream in self.streams.values() {
1089 if let Some(ref sase) = stream.sase_engine {
1090 if let Some(key) = sase.partition_by() {
1091 return Some(key.to_string());
1092 }
1093 }
1094 for op in &stream.operations {
1095 match op {
1096 RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
1097 RuntimeOp::PartitionedSlidingCountWindow(pw) => {
1098 return Some(pw.partition_key.clone())
1099 }
1100 RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
1101 _ => {}
1102 }
1103 }
1104 if stream.sase_engine.is_some() {
1105 for op in &stream.operations {
1106 if let RuntimeOp::WhereExpr(expr) = op {
1107 if let Some(key) = compilation::extract_equality_join_key(expr) {
1108 return Some(key);
1109 }
1110 }
1111 }
1112 }
1113 }
1114 None
1115 }
1116
1117 pub fn has_session_windows(&self) -> bool {
1119 self.streams.values().any(|s| {
1120 s.operations.iter().any(|op| {
1121 matches!(
1122 op,
1123 RuntimeOp::Window(WindowType::Session(_))
1124 | RuntimeOp::Window(WindowType::PartitionedSession(_))
1125 )
1126 })
1127 })
1128 }
1129
1130 pub fn min_session_gap(&self) -> Option<chrono::Duration> {
1132 let mut min_gap: Option<chrono::Duration> = None;
1133 for stream in self.streams.values() {
1134 for op in &stream.operations {
1135 if let RuntimeOp::Window(window) = op {
1136 let gap = match window {
1137 WindowType::Session(w) => Some(w.gap()),
1138 WindowType::PartitionedSession(w) => Some(w.gap()),
1139 _ => None,
1140 };
1141 if let Some(g) = gap {
1142 min_gap = Some(match min_gap {
1143 Some(current) if g < current => g,
1144 Some(current) => current,
1145 None => g,
1146 });
1147 }
1148 }
1149 }
1150 }
1151 min_gap
1152 }
1153
1154 pub fn metrics(&self) -> EngineMetrics {
1156 EngineMetrics {
1157 events_processed: self.events_processed,
1158 output_events_emitted: self.output_events_emitted,
1159 streams_count: self.streams.len(),
1160 }
1161 }
1162
1163 pub fn stream_names(&self) -> Vec<&str> {
1165 self.streams.keys().map(|s| s.as_str()).collect()
1166 }
1167
1168 pub fn physical_plan_summary(&self) -> Option<String> {
1170 self.physical_plan.as_ref().map(|p| p.summary())
1171 }
1172
1173 pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
1174 let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
1175 let optimized = varpulis_parser::optimize_plan(logical);
1176 Ok(optimized.explain())
1177 }
1178
1179 pub fn topology(&self) -> topology::Topology {
1181 let mut builder = topology_builder::TopologyBuilder::new();
1182 for (name, stream_def) in &self.streams {
1183 builder = builder.add_stream(name, stream_def);
1184 }
1185 builder = builder.add_routes(self.router.all_routes());
1186 builder.build()
1187 }
1188
1189 pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1191 self.functions.get(name)
1192 }
1193
1194 pub fn function_names(&self) -> Vec<&str> {
1196 self.functions.keys().map(|s| s.as_str()).collect()
1197 }
1198
1199 pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1201 let mut timers = Vec::new();
1202 for stream in self.streams.values() {
1203 if let RuntimeSource::Timer(config) = &stream.source {
1204 timers.push((
1205 config.interval_ns,
1206 config.initial_delay_ns,
1207 config.timer_event_type.clone(),
1208 ));
1209 }
1210 }
1211 timers
1212 }
1213
1214 pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1216 self.udf_registry.register_scalar(udf);
1217 }
1218
1219 pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1221 self.udf_registry.register_aggregate(udf);
1222 }
1223
1224 pub const fn udf_registry(&self) -> &UdfRegistry {
1226 &self.udf_registry
1227 }
1228
1229 #[cfg(feature = "async-runtime")]
1235 pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1236 let mut report = ReloadReport::default();
1237
1238 let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1239
1240 let mut new_engine = Self::new_internal(self.clone_output_channel());
1241 new_engine.credentials_store = self.credentials_store.clone();
1242 new_engine.load(program)?;
1243
1244 let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1245
1246 for name in new_streams.difference(&old_streams) {
1247 report.streams_added.push(name.clone());
1248 }
1249
1250 for name in old_streams.difference(&new_streams) {
1251 report.streams_removed.push(name.clone());
1252 }
1253
1254 for name in old_streams.intersection(&new_streams) {
1255 let old_stream = self.streams.get(name).unwrap();
1256 let new_stream = new_engine.streams.get(name).unwrap();
1257
1258 let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1259 let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1260
1261 if source_changed || ops_changed {
1262 report.streams_updated.push(name.clone());
1263 report.state_reset.push(name.clone());
1264 } else {
1265 report.state_preserved.push(name.clone());
1266 }
1267 }
1268
1269 for name in &report.streams_removed {
1270 self.streams.remove(name);
1271 }
1272
1273 self.router.clear();
1274
1275 for name in &report.streams_added {
1276 if let Some(stream) = new_engine.streams.remove(name) {
1277 self.streams.insert(name.clone(), stream);
1278 }
1279 }
1280
1281 for name in &report.streams_updated {
1282 if let Some(stream) = new_engine.streams.remove(name) {
1283 self.streams.insert(name.clone(), stream);
1284 }
1285 }
1286
1287 let registrations: Vec<(String, String)> = self
1288 .streams
1289 .iter()
1290 .flat_map(|(name, stream)| {
1291 let mut pairs = Vec::new();
1292 match &stream.source {
1293 RuntimeSource::EventType(et) => {
1294 pairs.push((et.clone(), name.clone()));
1295 }
1296 RuntimeSource::Stream(s) => {
1297 pairs.push((s.clone(), name.clone()));
1298 }
1299 RuntimeSource::Merge(sources) => {
1300 for ms in sources {
1301 pairs.push((ms.event_type.clone(), name.clone()));
1302 }
1303 }
1304 RuntimeSource::Join(_) => {}
1305 RuntimeSource::Timer(config) => {
1306 pairs.push((config.timer_event_type.clone(), name.clone()));
1307 }
1308 }
1309 pairs
1310 })
1311 .collect();
1312
1313 for (event_type, stream_name) in registrations {
1314 self.router.add_route(&event_type, &stream_name);
1315 }
1316
1317 self.functions = new_engine.functions;
1318 self.patterns = new_engine.patterns;
1319 self.configs = new_engine.configs;
1320 self.context_map = new_engine.context_map;
1321 self.connectors = new_engine.connectors;
1322 self.source_bindings = new_engine.source_bindings;
1323 *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1324
1325 for (name, value) in new_engine.variables {
1326 if !self.variables.contains_key(&name) {
1327 self.variables.insert(name.clone(), value);
1328 self.mutable_vars
1329 .extend(new_engine.mutable_vars.iter().cloned());
1330 }
1331 }
1332
1333 info!(
1334 "Hot reload complete: +{} -{} ~{} streams",
1335 report.streams_added.len(),
1336 report.streams_removed.len(),
1337 report.streams_updated.len()
1338 );
1339
1340 Ok(report)
1341 }
1342
1343 pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1349 use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1350
1351 let mut window_states = std::collections::HashMap::new();
1352 let mut sase_states = std::collections::HashMap::new();
1353 let mut join_states = std::collections::HashMap::new();
1354 let mut distinct_states = std::collections::HashMap::new();
1355 let mut limit_states = std::collections::HashMap::new();
1356
1357 for (name, stream) in &self.streams {
1358 for op in &stream.operations {
1359 match op {
1360 RuntimeOp::Window(wt) => {
1361 let cp = match wt {
1362 WindowType::Tumbling(w) => w.checkpoint(),
1363 WindowType::Sliding(w) => w.checkpoint(),
1364 WindowType::Count(w) => w.checkpoint(),
1365 WindowType::SlidingCount(w) => w.checkpoint(),
1366 WindowType::Session(w) => w.checkpoint(),
1367 WindowType::PartitionedSession(w) => w.checkpoint(),
1368 WindowType::PartitionedTumbling(w) => w.checkpoint(),
1369 WindowType::PartitionedSliding(w) => w.checkpoint(),
1370 };
1371 window_states.insert(name.clone(), cp);
1372 }
1373 RuntimeOp::PartitionedWindow(pw) => {
1374 let mut partitions = std::collections::HashMap::new();
1375 for (key, cw) in &pw.windows {
1376 let sub_cp = cw.checkpoint();
1377 partitions.insert(
1378 key.clone(),
1379 crate::persistence::PartitionedWindowCheckpoint {
1380 events: sub_cp.events,
1381 window_start_ms: sub_cp.window_start_ms,
1382 },
1383 );
1384 }
1385 window_states.insert(
1386 name.clone(),
1387 WindowCheckpoint {
1388 events: Vec::new(),
1389 window_start_ms: None,
1390 last_emit_ms: None,
1391 partitions,
1392 },
1393 );
1394 }
1395 RuntimeOp::Distinct(state) => {
1396 let keys: Vec<String> =
1397 state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1398 distinct_states.insert(
1399 name.clone(),
1400 crate::persistence::DistinctCheckpoint { keys },
1401 );
1402 }
1403 RuntimeOp::Limit(state) => {
1404 limit_states.insert(
1405 name.clone(),
1406 crate::persistence::LimitCheckpoint {
1407 max: state.max,
1408 count: state.count,
1409 },
1410 );
1411 }
1412 _ => {}
1413 }
1414 }
1415
1416 if let Some(ref sase) = stream.sase_engine {
1417 sase_states.insert(name.clone(), sase.checkpoint());
1418 }
1419
1420 if let Some(ref jb) = stream.join_buffer {
1421 join_states.insert(name.clone(), jb.checkpoint());
1422 }
1423 }
1424
1425 let variables = self
1426 .variables
1427 .iter()
1428 .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1429 .collect();
1430
1431 let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1432
1433 EngineCheckpoint {
1434 version: crate::persistence::CHECKPOINT_VERSION,
1435 window_states,
1436 sase_states,
1437 join_states,
1438 variables,
1439 events_processed: self.events_processed,
1440 output_events_emitted: self.output_events_emitted,
1441 watermark_state,
1442 distinct_states,
1443 limit_states,
1444 }
1445 }
1446
1447 pub fn restore_checkpoint(
1449 &mut self,
1450 cp: &crate::persistence::EngineCheckpoint,
1451 ) -> Result<(), crate::persistence::StoreError> {
1452 let mut migrated = cp.clone();
1453 migrated.validate_and_migrate()?;
1454
1455 self.events_processed = cp.events_processed;
1456 self.output_events_emitted = cp.output_events_emitted;
1457
1458 for (k, sv) in &cp.variables {
1459 self.variables
1460 .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1461 }
1462
1463 for (name, stream) in &mut self.streams {
1464 if let Some(wcp) = cp.window_states.get(name) {
1465 for op in &mut stream.operations {
1466 match op {
1467 RuntimeOp::Window(wt) => match wt {
1468 WindowType::Tumbling(w) => w.restore(wcp),
1469 WindowType::Sliding(w) => w.restore(wcp),
1470 WindowType::Count(w) => w.restore(wcp),
1471 WindowType::SlidingCount(w) => w.restore(wcp),
1472 WindowType::Session(w) => w.restore(wcp),
1473 WindowType::PartitionedSession(w) => w.restore(wcp),
1474 WindowType::PartitionedTumbling(w) => w.restore(wcp),
1475 WindowType::PartitionedSliding(w) => w.restore(wcp),
1476 },
1477 RuntimeOp::PartitionedWindow(pw) => {
1478 for (key, pcp) in &wcp.partitions {
1479 let sub_wcp = crate::persistence::WindowCheckpoint {
1480 events: pcp.events.clone(),
1481 window_start_ms: pcp.window_start_ms,
1482 last_emit_ms: None,
1483 partitions: std::collections::HashMap::new(),
1484 };
1485 let window = pw
1486 .windows
1487 .entry(key.clone())
1488 .or_insert_with(|| CountWindow::new(pw.window_size));
1489 window.restore(&sub_wcp);
1490 }
1491 }
1492 _ => {}
1493 }
1494 }
1495 }
1496
1497 if let Some(scp) = cp.sase_states.get(name) {
1498 if let Some(ref mut sase) = stream.sase_engine {
1499 sase.restore(scp);
1500 }
1501 }
1502
1503 if let Some(jcp) = cp.join_states.get(name) {
1504 if let Some(ref mut jb) = stream.join_buffer {
1505 jb.restore(jcp);
1506 }
1507 }
1508
1509 if let Some(dcp) = cp.distinct_states.get(name) {
1510 for op in &mut stream.operations {
1511 if let RuntimeOp::Distinct(state) = op {
1512 state.seen.clear();
1513 for key in dcp.keys.iter().rev() {
1514 state.seen.insert(key.clone(), ());
1515 }
1516 }
1517 }
1518 }
1519
1520 if let Some(lcp) = cp.limit_states.get(name) {
1521 for op in &mut stream.operations {
1522 if let RuntimeOp::Limit(state) = op {
1523 state.max = lcp.max;
1524 state.count = lcp.count;
1525 }
1526 }
1527 }
1528 }
1529
1530 if let Some(ref wcp) = cp.watermark_state {
1531 if self.watermark_tracker.is_none() {
1532 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1533 }
1534 if let Some(ref mut tracker) = self.watermark_tracker {
1535 tracker.restore(wcp);
1536 self.last_applied_watermark = wcp
1537 .effective_watermark_ms
1538 .and_then(DateTime::from_timestamp_millis);
1539 }
1540 }
1541
1542 info!(
1543 "Engine restored: {} events processed, {} streams with state (schema v{})",
1544 cp.events_processed,
1545 cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1546 cp.version
1547 );
1548
1549 Ok(())
1550 }
1551
1552 pub fn enable_checkpointing(
1554 &mut self,
1555 store: std::sync::Arc<dyn crate::persistence::StateStore>,
1556 config: crate::persistence::CheckpointConfig,
1557 ) -> Result<(), crate::persistence::StoreError> {
1558 let manager = crate::persistence::CheckpointManager::new(store, config)?;
1559
1560 if let Some(cp) = manager.recover()? {
1561 if let Some(engine_cp) = cp.context_states.get("main") {
1562 self.restore_checkpoint(engine_cp)?;
1563 info!(
1564 "Auto-restored from checkpoint {} ({} events)",
1565 cp.id, cp.events_processed
1566 );
1567 }
1568 }
1569
1570 self.checkpoint_manager = Some(manager);
1571 Ok(())
1572 }
1573
1574 pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1576 let should = self
1577 .checkpoint_manager
1578 .as_ref()
1579 .is_some_and(|m| m.should_checkpoint());
1580
1581 if should {
1582 self.force_checkpoint()?;
1583 }
1584 Ok(())
1585 }
1586
1587 pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1589 if self.checkpoint_manager.is_none() {
1590 return Ok(());
1591 }
1592
1593 let engine_cp = self.create_checkpoint();
1594 let events_processed = self.events_processed;
1595
1596 let mut context_states = std::collections::HashMap::new();
1597 context_states.insert("main".to_string(), engine_cp);
1598
1599 let cp = crate::persistence::Checkpoint {
1600 id: 0,
1601 timestamp_ms: 0,
1602 events_processed,
1603 window_states: std::collections::HashMap::new(),
1604 pattern_states: std::collections::HashMap::new(),
1605 metadata: std::collections::HashMap::new(),
1606 context_states,
1607 };
1608
1609 self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1610 Ok(())
1611 }
1612
1613 pub const fn has_checkpointing(&self) -> bool {
1615 self.checkpoint_manager.is_some()
1616 }
1617
1618 pub fn enable_watermark_tracking(&mut self) {
1624 if self.watermark_tracker.is_none() {
1625 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1626 }
1627 }
1628
1629 pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1631 if let Some(ref mut tracker) = self.watermark_tracker {
1632 tracker.register_source(source, max_ooo);
1633 }
1634 }
1635
1636 #[cfg(feature = "async-runtime")]
1638 #[tracing::instrument(skip(self))]
1639 pub async fn advance_external_watermark(
1640 &mut self,
1641 source_context: &str,
1642 watermark_ms: i64,
1643 ) -> Result<(), error::EngineError> {
1644 if let Some(ref mut tracker) = self.watermark_tracker {
1645 if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1646 tracker.advance_source_watermark(source_context, wm);
1647
1648 if let Some(new_wm) = tracker.effective_watermark() {
1649 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1650 self.apply_watermark_to_windows(new_wm).await?;
1651 self.last_applied_watermark = Some(new_wm);
1652 }
1653 }
1654 }
1655 }
1656 Ok(())
1657 }
1658
1659 #[cfg(feature = "async-runtime")]
1661 fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1662 match (a, b) {
1663 (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1664 (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1665 (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1666 a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1667 }
1668 (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1669 a.len() == b.len()
1670 && a.iter()
1671 .zip(b.iter())
1672 .all(|(x, y)| x.event_type == y.event_type)
1673 }
1674 (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1675 _ => false,
1676 }
1677 }
1678}