1mod builder;
7mod compilation;
8pub mod compiler;
9mod dispatch;
10pub mod error;
11pub mod evaluator;
12mod pattern_analyzer;
13pub mod physical_plan;
14mod pipeline;
15pub mod planner;
16mod router;
17mod sink_factory;
18pub mod topology;
19pub mod topology_builder;
20mod types;
21
22pub use builder::EngineBuilder;
24pub use sink_factory::SinkConnectorAdapter;
25pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
26
27pub use evaluator::eval_filter_expr;
29
30use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
32
33use crate::connector;
34use crate::context::ContextMap;
35use crate::event::{Event, SharedEvent};
36use crate::metrics::Metrics;
37use crate::sase_persistence::SaseCheckpointExt;
38use crate::sequence::SequenceContext;
39use crate::udf::UdfRegistry;
40use crate::watermark::PerSourceWatermarkTracker;
41use crate::window::CountWindow;
42use chrono::Duration;
43use chrono::{DateTime, Utc};
44use rustc_hash::{FxHashMap, FxHashSet};
45use std::sync::Arc;
46use tokio::sync::mpsc;
47use tracing::{info, warn};
48use varpulis_core::ast::{ConfigItem, Program, Stmt};
49use varpulis_core::Value;
50
51pub use types::NamedPattern;
53
54#[derive(Debug)]
56pub(super) enum OutputChannel {
57 Owned(mpsc::Sender<Event>),
59 Shared(mpsc::Sender<SharedEvent>),
61}
62
63pub struct Engine {
65 pub(super) streams: FxHashMap<String, StreamDefinition>,
67 pub(super) router: router::EventRouter,
69 pub(super) functions: FxHashMap<String, UserFunction>,
71 pub(super) patterns: FxHashMap<String, NamedPattern>,
73 pub(super) configs: FxHashMap<String, EngineConfig>,
75 pub(super) variables: FxHashMap<String, Value>,
77 pub(super) mutable_vars: FxHashSet<String>,
79 pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
81 pub(super) source_bindings: Vec<SourceBinding>,
83 pub(super) sinks: sink_factory::SinkRegistry,
85 pub(super) output_channel: Option<OutputChannel>,
87 pub(super) events_processed: u64,
89 pub(super) output_events_emitted: u64,
90 pub(super) metrics: Option<Metrics>,
92 pub(super) context_map: ContextMap,
94 pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
96 pub(super) last_applied_watermark: Option<DateTime<Utc>>,
98 pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
100 pub(super) context_name: Option<String>,
102 pub(super) shared_hamlet_aggregators:
104 Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
105 pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
107 pub(super) dlq_path: Option<std::path::PathBuf>,
109 pub(super) dlq_config: crate::dead_letter::DlqConfig,
111 pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
113 pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
115 pub(super) udf_registry: UdfRegistry,
117}
118
119impl std::fmt::Debug for Engine {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 f.debug_struct("Engine")
122 .field("streams", &self.streams.keys().collect::<Vec<_>>())
123 .field("functions", &self.functions.keys().collect::<Vec<_>>())
124 .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
125 .field("configs", &self.configs.keys().collect::<Vec<_>>())
126 .field("connectors", &self.connectors.keys().collect::<Vec<_>>())
127 .field("events_processed", &self.events_processed)
128 .field("output_events_emitted", &self.output_events_emitted)
129 .field("context_map", &self.context_map)
130 .field("context_name", &self.context_name)
131 .finish_non_exhaustive()
132 }
133}
134
135impl Engine {
136 pub fn builder() -> EngineBuilder {
150 EngineBuilder::new()
151 }
152
153 pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
154 Self {
155 streams: FxHashMap::default(),
156 router: router::EventRouter::new(),
157 functions: FxHashMap::default(),
158 patterns: FxHashMap::default(),
159 configs: FxHashMap::default(),
160 variables: FxHashMap::default(),
161 mutable_vars: FxHashSet::default(),
162 connectors: FxHashMap::default(),
163 source_bindings: Vec::new(),
164 sinks: sink_factory::SinkRegistry::new(),
165 output_channel: Some(OutputChannel::Owned(output_tx)),
166 events_processed: 0,
167 output_events_emitted: 0,
168 metrics: None,
169 context_map: ContextMap::new(),
170 watermark_tracker: None,
171 last_applied_watermark: None,
172 late_data_configs: FxHashMap::default(),
173 context_name: None,
174 shared_hamlet_aggregators: Vec::new(),
175 checkpoint_manager: None,
176 dlq_path: None,
177 dlq_config: crate::dead_letter::DlqConfig::default(),
178 dlq: None,
179 physical_plan: None,
180 udf_registry: UdfRegistry::new(),
181 }
182 }
183
184 pub fn new_benchmark() -> Self {
186 Self::new_internal(None)
187 }
188
189 pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
191 Self::new_internal(output_tx.map(OutputChannel::Owned))
192 }
193
194 pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
196 Self::new_internal(Some(OutputChannel::Shared(output_tx)))
197 }
198
199 fn new_internal(output_channel: Option<OutputChannel>) -> Self {
201 Self {
202 streams: FxHashMap::default(),
203 router: router::EventRouter::new(),
204 functions: FxHashMap::default(),
205 patterns: FxHashMap::default(),
206 configs: FxHashMap::default(),
207 variables: FxHashMap::default(),
208 mutable_vars: FxHashSet::default(),
209 connectors: FxHashMap::default(),
210 source_bindings: Vec::new(),
211 sinks: sink_factory::SinkRegistry::new(),
212 output_channel,
213 events_processed: 0,
214 output_events_emitted: 0,
215 metrics: None,
216 context_map: ContextMap::new(),
217 watermark_tracker: None,
218 last_applied_watermark: None,
219 late_data_configs: FxHashMap::default(),
220 context_name: None,
221 shared_hamlet_aggregators: Vec::new(),
222 checkpoint_manager: None,
223 dlq_path: None,
224 dlq_config: crate::dead_letter::DlqConfig::default(),
225 dlq: None,
226 physical_plan: None,
227 udf_registry: UdfRegistry::new(),
228 }
229 }
230
231 fn clone_output_channel(&self) -> Option<OutputChannel> {
233 match &self.output_channel {
234 Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
235 Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
236 None => None,
237 }
238 }
239
240 #[inline]
244 pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
245 match &self.output_channel {
246 Some(OutputChannel::Shared(tx)) => {
247 if let Err(e) = tx.try_send(Arc::clone(event)) {
249 warn!("Failed to send output event: {}", e);
250 }
251 }
252 Some(OutputChannel::Owned(tx)) => {
253 let owned = (**event).clone();
255 if let Err(e) = tx.try_send(owned) {
256 warn!("Failed to send output event: {}", e);
257 }
258 }
259 None => {
260 }
262 }
263 }
264
265 #[inline]
268 pub(super) fn send_output(&mut self, event: Event) {
269 match &self.output_channel {
270 Some(OutputChannel::Shared(tx)) => {
271 if let Err(e) = tx.try_send(Arc::new(event)) {
273 warn!("Failed to send output event: {}", e);
274 }
275 }
276 Some(OutputChannel::Owned(tx)) => {
277 if let Err(e) = tx.try_send(event) {
278 warn!("Failed to send output event: {}", e);
279 }
280 }
281 None => {
282 }
284 }
285 }
286
287 pub fn set_context_name(&mut self, name: &str) {
289 self.context_name = Some(name.to_string());
290 }
291
292 pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
294 self.dlq_path = Some(path);
295 }
296
297 pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
299 self.dlq_config = config;
300 }
301
302 pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
304 self.dlq.as_ref()
305 }
306
307 pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
309 self.patterns.get(name)
310 }
311
312 pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
314 &self.patterns
315 }
316
317 pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
319 self.configs.get(name)
320 }
321
322 pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
324 self.connectors.get(name)
325 }
326
327 pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
329 &self.connectors
330 }
331
332 pub fn source_bindings(&self) -> &[SourceBinding] {
334 &self.source_bindings
335 }
336
337 pub fn get_variable(&self, name: &str) -> Option<&Value> {
339 self.variables.get(name)
340 }
341
342 pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
344 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
345 return Err(error::EngineError::Compilation(format!(
346 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
347 )));
348 }
349 self.variables.insert(name.to_string(), value);
350 Ok(())
351 }
352
353 pub const fn variables(&self) -> &FxHashMap<String, Value> {
355 &self.variables
356 }
357
358 pub const fn context_map(&self) -> &ContextMap {
360 &self.context_map
361 }
362
363 pub fn has_contexts(&self) -> bool {
365 self.context_map.has_contexts()
366 }
367
368 pub fn with_metrics(mut self, metrics: Metrics) -> Self {
370 self.metrics = Some(metrics);
371 self
372 }
373
374 pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
376 where
377 F: Fn(&Event) -> bool + Send + Sync + 'static,
378 {
379 if let Some(stream) = self.streams.get_mut(stream_name) {
380 let wrapped = move |e: &SharedEvent| filter(e.as_ref());
381 stream
382 .operations
383 .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
384 Ok(())
385 } else {
386 Err(error::EngineError::StreamNotFound(stream_name.to_string()))
387 }
388 }
389
390 pub fn load_with_source(
392 &mut self,
393 source: &str,
394 program: &Program,
395 ) -> Result<(), error::EngineError> {
396 let validation = varpulis_core::validate::validate(source, program);
397 if validation.has_errors() {
398 return Err(error::EngineError::Compilation(validation.format(source)));
399 }
400 for warning in validation
401 .diagnostics
402 .iter()
403 .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
404 {
405 warn!("{}", warning.message);
406 }
407 self.load_program(program)
408 }
409
410 pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
412 self.load_program(program)
413 }
414
415 fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
416 for stmt in &program.statements {
417 match &stmt.node {
418 Stmt::StreamDecl {
419 name, source, ops, ..
420 } => {
421 self.register_stream(name, source, ops)?;
422 }
423 Stmt::EventDecl { name, fields, .. } => {
424 info!(
425 "Registered event type: {} with {} fields",
426 name,
427 fields.len()
428 );
429 }
430 Stmt::FnDecl {
431 name,
432 params,
433 ret,
434 body,
435 } => {
436 let user_fn = UserFunction {
437 name: name.clone(),
438 params: params
439 .iter()
440 .map(|p| (p.name.clone(), p.ty.clone()))
441 .collect(),
442 return_type: ret.clone(),
443 body: body.clone(),
444 };
445 info!(
446 "Registered function: {}({} params)",
447 name,
448 user_fn.params.len()
449 );
450 self.functions.insert(name.clone(), user_fn);
451 }
452 Stmt::Config { name, items } => {
453 warn!(
454 "DEPRECATED: 'config {}' block syntax is deprecated. \
455 Use 'connector' declarations instead: \
456 connector MyConn = {} (...)",
457 name, name
458 );
459 let mut values = std::collections::HashMap::new();
460 for item in items {
461 if let ConfigItem::Value(key, val) = item {
462 values.insert(key.clone(), val.clone());
463 }
464 }
465 info!(
466 "Registered config block: {} with {} items",
467 name,
468 values.len()
469 );
470 self.configs.insert(
471 name.clone(),
472 EngineConfig {
473 name: name.clone(),
474 values,
475 },
476 );
477 }
478 Stmt::PatternDecl {
479 name,
480 expr,
481 within,
482 partition_by,
483 } => {
484 let named_pattern = NamedPattern {
485 name: name.clone(),
486 expr: expr.clone(),
487 within: within.clone(),
488 partition_by: partition_by.clone(),
489 };
490 info!(
491 "Registered SASE+ pattern: {} (within: {}, partition: {})",
492 name,
493 within.is_some(),
494 partition_by.is_some()
495 );
496 self.patterns.insert(name.clone(), named_pattern);
497 }
498 Stmt::Import { path, alias } => {
499 warn!(
500 "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
501 path, alias
502 );
503 }
504 Stmt::VarDecl {
505 mutable,
506 name,
507 value,
508 ..
509 } => {
510 let dummy_event = Event::new("__init__");
511 let empty_ctx = SequenceContext::new();
512 let initial_value = evaluator::eval_expr_with_functions(
513 value,
514 &dummy_event,
515 &empty_ctx,
516 &self.functions,
517 &self.variables,
518 )
519 .ok_or_else(|| {
520 error::EngineError::Compilation(format!(
521 "Failed to evaluate initial value for variable '{name}'"
522 ))
523 })?;
524
525 info!(
526 "Registered {} variable: {} = {:?}",
527 if *mutable { "mutable" } else { "immutable" },
528 name,
529 initial_value
530 );
531
532 self.variables.insert(name.clone(), initial_value);
533 if *mutable {
534 self.mutable_vars.insert(name.clone());
535 }
536 }
537 Stmt::Assignment { name, value } => {
538 let dummy_event = Event::new("__assign__");
539 let empty_ctx = SequenceContext::new();
540 let new_value = evaluator::eval_expr_with_functions(
541 value,
542 &dummy_event,
543 &empty_ctx,
544 &self.functions,
545 &self.variables,
546 )
547 .ok_or_else(|| {
548 error::EngineError::Compilation(format!(
549 "Failed to evaluate assignment value for '{name}'"
550 ))
551 })?;
552
553 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
554 return Err(error::EngineError::Compilation(format!(
555 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
556 )));
557 }
558
559 if !self.variables.contains_key(name) {
560 self.mutable_vars.insert(name.clone());
561 }
562
563 info!("Assigned variable: {} = {:?}", name, new_value);
564 self.variables.insert(name.clone(), new_value);
565 }
566 Stmt::ContextDecl { name, cores } => {
567 use crate::context::ContextConfig;
568 info!("Registered context: {} (cores: {:?})", name, cores);
569 self.context_map.register_context(ContextConfig {
570 name: name.clone(),
571 cores: cores.clone(),
572 });
573 }
574 Stmt::ConnectorDecl {
575 name,
576 connector_type,
577 params,
578 } => {
579 let config = sink_factory::connector_params_to_config(connector_type, params);
580 info!("Registered connector: {} (type: {})", name, connector_type);
581 self.connectors.insert(name.clone(), config);
582 }
583 _ => {
584 tracing::debug!("Skipping statement: {:?}", stmt.node);
585 }
586 }
587 }
588
589 let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
591 let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
592 for stream in self.streams.values() {
593 for op in &stream.operations {
594 if let RuntimeOp::To(to_config) = op {
595 referenced_sink_keys.insert(to_config.sink_key.clone());
596 if let Some(ref topic) = to_config.topic_override {
597 topic_overrides.push((
598 to_config.sink_key.clone(),
599 to_config.connector_name.clone(),
600 topic.clone(),
601 ));
602 }
603 }
604 }
605 }
606
607 self.sinks.build_from_connectors(
609 &self.connectors,
610 &referenced_sink_keys,
611 &topic_overrides,
612 self.context_name.as_deref(),
613 );
614
615 if !self.sinks.cache().is_empty() {
617 let dlq_path = self
618 .dlq_path
619 .clone()
620 .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
621 let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
622 &dlq_path,
623 self.dlq_config.clone(),
624 )
625 .map(Arc::new)
626 .ok();
627 if dlq.is_some() {
628 tracing::info!(
629 "Dead letter queue enabled at {} for {} sink(s)",
630 dlq_path.display(),
631 self.sinks.cache().len()
632 );
633 }
634 self.dlq = dlq.clone();
635 self.sinks.wrap_with_resilience(
636 crate::circuit_breaker::CircuitBreakerConfig::default(),
637 dlq,
638 self.metrics.clone(),
639 );
640 }
641
642 self.setup_hamlet_sharing();
644
645 let mut plan = physical_plan::PhysicalPlan::new();
647 let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
648 for (event_type, targets) in self.router.all_routes() {
649 for target in targets.iter() {
650 stream_event_types
651 .entry(target.clone())
652 .or_default()
653 .push(event_type.clone());
654 }
655 }
656 for (name, stream_def) in &self.streams {
657 let op_summary = stream_def
658 .operations
659 .iter()
660 .map(|op| op.summary_name())
661 .collect::<Vec<_>>()
662 .join(" → ");
663 plan.add_stream(physical_plan::PhysicalStream {
664 name: name.clone(),
665 operation_count: stream_def.operations.len(),
666 operation_summary: op_summary,
667 logical_id: plan.stream_count() as u32,
668 registered_event_types: stream_event_types
669 .remove(name.as_str())
670 .unwrap_or_default(),
671 });
672 }
673 self.physical_plan = Some(plan);
674
675 Ok(())
676 }
677
678 fn setup_hamlet_sharing(&mut self) {
681 use crate::hamlet::template::TemplateBuilder;
682 use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
683
684 let hamlet_streams: Vec<String> = self
685 .streams
686 .iter()
687 .filter(|(_, s)| s.hamlet_aggregator.is_some())
688 .map(|(name, _)| name.clone())
689 .collect();
690
691 if hamlet_streams.len() < 2 {
692 return;
693 }
694
695 let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
696
697 for stream_name in &hamlet_streams {
698 if let Some(stream) = self.streams.get(stream_name) {
699 let mut kleene_types = Vec::new();
700 for op in &stream.operations {
701 if let RuntimeOp::TrendAggregate(_) = op {
702 if let Some(ref agg) = stream.hamlet_aggregator {
703 for query in agg.registered_queries() {
704 for &kt in &query.kleene_types {
705 let type_name = format!("type_{kt}");
706 if !kleene_types.contains(&type_name) {
707 kleene_types.push(type_name);
708 }
709 }
710 }
711 }
712 }
713 }
714 kleene_types.sort();
715 kleene_groups
716 .entry(kleene_types)
717 .or_default()
718 .push(stream_name.clone());
719 }
720 }
721
722 for (kleene_key, group_streams) in &kleene_groups {
723 if group_streams.len() < 2 || kleene_key.is_empty() {
724 continue;
725 }
726
727 info!(
728 "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
729 group_streams.len(),
730 kleene_key,
731 group_streams,
732 );
733
734 let mut builder = TemplateBuilder::new();
735 type SharingEntry = (
736 String,
737 QueryRegistration,
738 Vec<(String, crate::greta::GretaAggregate)>,
739 );
740 let mut all_registrations: Vec<SharingEntry> = Vec::new();
741 let mut next_query_id: crate::greta::QueryId = 0;
742
743 for stream_name in group_streams {
744 if let Some(stream) = self.streams.get(stream_name) {
745 if let Some(ref agg) = stream.hamlet_aggregator {
746 for query in agg.registered_queries() {
747 let new_id = next_query_id;
748 next_query_id += 1;
749
750 let event_names: Vec<String> = query
751 .event_types
752 .iter()
753 .map(|&idx| format!("type_{idx}"))
754 .collect();
755 let name_strs: Vec<&str> =
756 event_names.iter().map(|s| s.as_str()).collect();
757
758 builder.add_sequence(new_id, &name_strs);
759
760 for &kt in &query.kleene_types {
761 let type_name = format!("type_{kt}");
762 let position = event_names
763 .iter()
764 .position(|n| *n == type_name)
765 .unwrap_or(0);
766 builder.add_kleene(new_id, &type_name, position as u16);
767 }
768
769 let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
770 .operations
771 .iter()
772 .find_map(|op| {
773 if let RuntimeOp::TrendAggregate(config) = op {
774 Some(config.fields.clone())
775 } else {
776 None
777 }
778 })
779 .unwrap_or_default();
780
781 all_registrations.push((
782 stream_name.clone(),
783 QueryRegistration {
784 id: new_id,
785 event_types: query.event_types.clone(),
786 kleene_types: query.kleene_types.clone(),
787 aggregate: query.aggregate,
788 },
789 fields,
790 ));
791 }
792 }
793 }
794 }
795
796 let template = builder.build();
797 let config = HamletConfig {
798 window_ms: 60_000,
799 incremental: true,
800 ..Default::default()
801 };
802 let mut shared_agg = HamletAggregator::new(config, template);
803
804 for (_, registration, _) in &all_registrations {
805 shared_agg.register_query(registration.clone());
806 }
807
808 let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
809 self.shared_hamlet_aggregators.push(shared_ref.clone());
810
811 for (stream_name, registration, fields) in &all_registrations {
812 if let Some(stream) = self.streams.get_mut(stream_name) {
813 stream.hamlet_aggregator = None;
814 stream.shared_hamlet_ref = Some(shared_ref.clone());
815
816 for op in &mut stream.operations {
817 if let RuntimeOp::TrendAggregate(config) = op {
818 config.query_id = registration.id;
819 config.fields = fields.clone();
820 }
821 }
822 }
823 }
824
825 info!(
826 "Created shared Hamlet aggregator with {} queries",
827 next_query_id,
828 );
829 }
830 }
831
832 #[tracing::instrument(skip(self))]
834 pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
835 self.sinks
836 .connect_all()
837 .await
838 .map_err(error::EngineError::Pipeline)
839 }
840
841 pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
843 self.sinks.insert(key.to_string(), sink);
844 }
845
846 pub fn has_sink(&self, key: &str) -> bool {
848 self.sinks.cache().contains_key(key)
849 }
850
851 pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
853 let prefix = format!("{connector_name}::");
854 self.sinks
855 .cache()
856 .keys()
857 .filter(|k| *k == connector_name || k.starts_with(&prefix))
858 .cloned()
859 .collect()
860 }
861
862 pub fn has_sink_operations(&self) -> bool {
868 self.streams.values().any(|s| {
869 s.operations
870 .iter()
871 .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
872 })
873 }
874
875 pub const fn event_counters(&self) -> (u64, u64) {
877 (self.events_processed, self.output_events_emitted)
878 }
879
880 pub fn is_stateless(&self) -> bool {
882 self.streams.values().all(|s| {
883 s.sase_engine.is_none()
884 && s.join_buffer.is_none()
885 && s.hamlet_aggregator.is_none()
886 && s.shared_hamlet_ref.is_none()
887 && s.operations.iter().all(|op| {
888 matches!(
889 op,
890 RuntimeOp::WhereExpr(_)
891 | RuntimeOp::WhereClosure(_)
892 | RuntimeOp::Select(_)
893 | RuntimeOp::Emit(_)
894 | RuntimeOp::EmitExpr(_)
895 | RuntimeOp::Print(_)
896 | RuntimeOp::Log(_)
897 | RuntimeOp::Having(_)
898 | RuntimeOp::Process(_)
899 | RuntimeOp::Pattern(_)
900 | RuntimeOp::To(_)
901 )
902 })
903 })
904 }
905
906 pub fn partition_key(&self) -> Option<String> {
908 for stream in self.streams.values() {
909 if let Some(ref sase) = stream.sase_engine {
910 if let Some(key) = sase.partition_by() {
911 return Some(key.to_string());
912 }
913 }
914 for op in &stream.operations {
915 match op {
916 RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
917 RuntimeOp::PartitionedSlidingCountWindow(pw) => {
918 return Some(pw.partition_key.clone())
919 }
920 RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
921 _ => {}
922 }
923 }
924 if stream.sase_engine.is_some() {
925 for op in &stream.operations {
926 if let RuntimeOp::WhereExpr(expr) = op {
927 if let Some(key) = compilation::extract_equality_join_key(expr) {
928 return Some(key);
929 }
930 }
931 }
932 }
933 }
934 None
935 }
936
937 pub fn has_session_windows(&self) -> bool {
939 self.streams.values().any(|s| {
940 s.operations.iter().any(|op| {
941 matches!(
942 op,
943 RuntimeOp::Window(WindowType::Session(_))
944 | RuntimeOp::Window(WindowType::PartitionedSession(_))
945 )
946 })
947 })
948 }
949
950 pub fn min_session_gap(&self) -> Option<chrono::Duration> {
952 let mut min_gap: Option<chrono::Duration> = None;
953 for stream in self.streams.values() {
954 for op in &stream.operations {
955 if let RuntimeOp::Window(window) = op {
956 let gap = match window {
957 WindowType::Session(w) => Some(w.gap()),
958 WindowType::PartitionedSession(w) => Some(w.gap()),
959 _ => None,
960 };
961 if let Some(g) = gap {
962 min_gap = Some(match min_gap {
963 Some(current) if g < current => g,
964 Some(current) => current,
965 None => g,
966 });
967 }
968 }
969 }
970 }
971 min_gap
972 }
973
974 pub fn metrics(&self) -> EngineMetrics {
976 EngineMetrics {
977 events_processed: self.events_processed,
978 output_events_emitted: self.output_events_emitted,
979 streams_count: self.streams.len(),
980 }
981 }
982
983 pub fn stream_names(&self) -> Vec<&str> {
985 self.streams.keys().map(|s| s.as_str()).collect()
986 }
987
988 pub fn physical_plan_summary(&self) -> Option<String> {
990 self.physical_plan.as_ref().map(|p| p.summary())
991 }
992
993 pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
994 let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
995 let optimized = varpulis_parser::optimize_plan(logical);
996 Ok(optimized.explain())
997 }
998
999 pub fn topology(&self) -> topology::Topology {
1001 let mut builder = topology_builder::TopologyBuilder::new();
1002 for (name, stream_def) in &self.streams {
1003 builder = builder.add_stream(name, stream_def);
1004 }
1005 builder = builder.add_routes(self.router.all_routes());
1006 builder.build()
1007 }
1008
1009 pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1011 self.functions.get(name)
1012 }
1013
1014 pub fn function_names(&self) -> Vec<&str> {
1016 self.functions.keys().map(|s| s.as_str()).collect()
1017 }
1018
1019 pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1021 let mut timers = Vec::new();
1022 for stream in self.streams.values() {
1023 if let RuntimeSource::Timer(config) = &stream.source {
1024 timers.push((
1025 config.interval_ns,
1026 config.initial_delay_ns,
1027 config.timer_event_type.clone(),
1028 ));
1029 }
1030 }
1031 timers
1032 }
1033
1034 pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1036 self.udf_registry.register_scalar(udf);
1037 }
1038
1039 pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1041 self.udf_registry.register_aggregate(udf);
1042 }
1043
1044 pub const fn udf_registry(&self) -> &UdfRegistry {
1046 &self.udf_registry
1047 }
1048
1049 pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1055 let mut report = ReloadReport::default();
1056
1057 let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1058
1059 let mut new_engine = Self::new_internal(self.clone_output_channel());
1060 new_engine.load(program)?;
1061
1062 let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1063
1064 for name in new_streams.difference(&old_streams) {
1065 report.streams_added.push(name.clone());
1066 }
1067
1068 for name in old_streams.difference(&new_streams) {
1069 report.streams_removed.push(name.clone());
1070 }
1071
1072 for name in old_streams.intersection(&new_streams) {
1073 let old_stream = self.streams.get(name).unwrap();
1074 let new_stream = new_engine.streams.get(name).unwrap();
1075
1076 let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1077 let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1078
1079 if source_changed || ops_changed {
1080 report.streams_updated.push(name.clone());
1081 report.state_reset.push(name.clone());
1082 } else {
1083 report.state_preserved.push(name.clone());
1084 }
1085 }
1086
1087 for name in &report.streams_removed {
1088 self.streams.remove(name);
1089 }
1090
1091 self.router.clear();
1092
1093 for name in &report.streams_added {
1094 if let Some(stream) = new_engine.streams.remove(name) {
1095 self.streams.insert(name.clone(), stream);
1096 }
1097 }
1098
1099 for name in &report.streams_updated {
1100 if let Some(stream) = new_engine.streams.remove(name) {
1101 self.streams.insert(name.clone(), stream);
1102 }
1103 }
1104
1105 let registrations: Vec<(String, String)> = self
1106 .streams
1107 .iter()
1108 .flat_map(|(name, stream)| {
1109 let mut pairs = Vec::new();
1110 match &stream.source {
1111 RuntimeSource::EventType(et) => {
1112 pairs.push((et.clone(), name.clone()));
1113 }
1114 RuntimeSource::Stream(s) => {
1115 pairs.push((s.clone(), name.clone()));
1116 }
1117 RuntimeSource::Merge(sources) => {
1118 for ms in sources {
1119 pairs.push((ms.event_type.clone(), name.clone()));
1120 }
1121 }
1122 RuntimeSource::Join(_) => {}
1123 RuntimeSource::Timer(config) => {
1124 pairs.push((config.timer_event_type.clone(), name.clone()));
1125 }
1126 }
1127 pairs
1128 })
1129 .collect();
1130
1131 for (event_type, stream_name) in registrations {
1132 self.router.add_route(&event_type, &stream_name);
1133 }
1134
1135 self.functions = new_engine.functions;
1136 self.patterns = new_engine.patterns;
1137 self.configs = new_engine.configs;
1138 self.context_map = new_engine.context_map;
1139 self.connectors = new_engine.connectors;
1140 self.source_bindings = new_engine.source_bindings;
1141 *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1142
1143 for (name, value) in new_engine.variables {
1144 if !self.variables.contains_key(&name) {
1145 self.variables.insert(name.clone(), value);
1146 self.mutable_vars
1147 .extend(new_engine.mutable_vars.iter().cloned());
1148 }
1149 }
1150
1151 info!(
1152 "Hot reload complete: +{} -{} ~{} streams",
1153 report.streams_added.len(),
1154 report.streams_removed.len(),
1155 report.streams_updated.len()
1156 );
1157
1158 Ok(report)
1159 }
1160
1161 pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1167 use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1168
1169 let mut window_states = std::collections::HashMap::new();
1170 let mut sase_states = std::collections::HashMap::new();
1171 let mut join_states = std::collections::HashMap::new();
1172 let mut distinct_states = std::collections::HashMap::new();
1173 let mut limit_states = std::collections::HashMap::new();
1174
1175 for (name, stream) in &self.streams {
1176 for op in &stream.operations {
1177 match op {
1178 RuntimeOp::Window(wt) => {
1179 let cp = match wt {
1180 WindowType::Tumbling(w) => w.checkpoint(),
1181 WindowType::Sliding(w) => w.checkpoint(),
1182 WindowType::Count(w) => w.checkpoint(),
1183 WindowType::SlidingCount(w) => w.checkpoint(),
1184 WindowType::Session(w) => w.checkpoint(),
1185 WindowType::PartitionedSession(w) => w.checkpoint(),
1186 WindowType::PartitionedTumbling(w) => w.checkpoint(),
1187 WindowType::PartitionedSliding(w) => w.checkpoint(),
1188 };
1189 window_states.insert(name.clone(), cp);
1190 }
1191 RuntimeOp::PartitionedWindow(pw) => {
1192 let mut partitions = std::collections::HashMap::new();
1193 for (key, cw) in &pw.windows {
1194 let sub_cp = cw.checkpoint();
1195 partitions.insert(
1196 key.clone(),
1197 crate::persistence::PartitionedWindowCheckpoint {
1198 events: sub_cp.events,
1199 window_start_ms: sub_cp.window_start_ms,
1200 },
1201 );
1202 }
1203 window_states.insert(
1204 name.clone(),
1205 WindowCheckpoint {
1206 events: Vec::new(),
1207 window_start_ms: None,
1208 last_emit_ms: None,
1209 partitions,
1210 },
1211 );
1212 }
1213 RuntimeOp::Distinct(state) => {
1214 let keys: Vec<String> =
1215 state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1216 distinct_states.insert(
1217 name.clone(),
1218 crate::persistence::DistinctCheckpoint { keys },
1219 );
1220 }
1221 RuntimeOp::Limit(state) => {
1222 limit_states.insert(
1223 name.clone(),
1224 crate::persistence::LimitCheckpoint {
1225 max: state.max,
1226 count: state.count,
1227 },
1228 );
1229 }
1230 _ => {}
1231 }
1232 }
1233
1234 if let Some(ref sase) = stream.sase_engine {
1235 sase_states.insert(name.clone(), sase.checkpoint());
1236 }
1237
1238 if let Some(ref jb) = stream.join_buffer {
1239 join_states.insert(name.clone(), jb.checkpoint());
1240 }
1241 }
1242
1243 let variables = self
1244 .variables
1245 .iter()
1246 .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1247 .collect();
1248
1249 let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1250
1251 EngineCheckpoint {
1252 version: crate::persistence::CHECKPOINT_VERSION,
1253 window_states,
1254 sase_states,
1255 join_states,
1256 variables,
1257 events_processed: self.events_processed,
1258 output_events_emitted: self.output_events_emitted,
1259 watermark_state,
1260 distinct_states,
1261 limit_states,
1262 }
1263 }
1264
1265 pub fn restore_checkpoint(
1267 &mut self,
1268 cp: &crate::persistence::EngineCheckpoint,
1269 ) -> Result<(), crate::persistence::StoreError> {
1270 let mut migrated = cp.clone();
1271 migrated.validate_and_migrate()?;
1272
1273 self.events_processed = cp.events_processed;
1274 self.output_events_emitted = cp.output_events_emitted;
1275
1276 for (k, sv) in &cp.variables {
1277 self.variables
1278 .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1279 }
1280
1281 for (name, stream) in &mut self.streams {
1282 if let Some(wcp) = cp.window_states.get(name) {
1283 for op in &mut stream.operations {
1284 match op {
1285 RuntimeOp::Window(wt) => match wt {
1286 WindowType::Tumbling(w) => w.restore(wcp),
1287 WindowType::Sliding(w) => w.restore(wcp),
1288 WindowType::Count(w) => w.restore(wcp),
1289 WindowType::SlidingCount(w) => w.restore(wcp),
1290 WindowType::Session(w) => w.restore(wcp),
1291 WindowType::PartitionedSession(w) => w.restore(wcp),
1292 WindowType::PartitionedTumbling(w) => w.restore(wcp),
1293 WindowType::PartitionedSliding(w) => w.restore(wcp),
1294 },
1295 RuntimeOp::PartitionedWindow(pw) => {
1296 for (key, pcp) in &wcp.partitions {
1297 let sub_wcp = crate::persistence::WindowCheckpoint {
1298 events: pcp.events.clone(),
1299 window_start_ms: pcp.window_start_ms,
1300 last_emit_ms: None,
1301 partitions: std::collections::HashMap::new(),
1302 };
1303 let window = pw
1304 .windows
1305 .entry(key.clone())
1306 .or_insert_with(|| CountWindow::new(pw.window_size));
1307 window.restore(&sub_wcp);
1308 }
1309 }
1310 _ => {}
1311 }
1312 }
1313 }
1314
1315 if let Some(scp) = cp.sase_states.get(name) {
1316 if let Some(ref mut sase) = stream.sase_engine {
1317 sase.restore(scp);
1318 }
1319 }
1320
1321 if let Some(jcp) = cp.join_states.get(name) {
1322 if let Some(ref mut jb) = stream.join_buffer {
1323 jb.restore(jcp);
1324 }
1325 }
1326
1327 if let Some(dcp) = cp.distinct_states.get(name) {
1328 for op in &mut stream.operations {
1329 if let RuntimeOp::Distinct(state) = op {
1330 state.seen.clear();
1331 for key in dcp.keys.iter().rev() {
1332 state.seen.insert(key.clone(), ());
1333 }
1334 }
1335 }
1336 }
1337
1338 if let Some(lcp) = cp.limit_states.get(name) {
1339 for op in &mut stream.operations {
1340 if let RuntimeOp::Limit(state) = op {
1341 state.max = lcp.max;
1342 state.count = lcp.count;
1343 }
1344 }
1345 }
1346 }
1347
1348 if let Some(ref wcp) = cp.watermark_state {
1349 if self.watermark_tracker.is_none() {
1350 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1351 }
1352 if let Some(ref mut tracker) = self.watermark_tracker {
1353 tracker.restore(wcp);
1354 self.last_applied_watermark = wcp
1355 .effective_watermark_ms
1356 .and_then(DateTime::from_timestamp_millis);
1357 }
1358 }
1359
1360 info!(
1361 "Engine restored: {} events processed, {} streams with state (schema v{})",
1362 cp.events_processed,
1363 cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1364 cp.version
1365 );
1366
1367 Ok(())
1368 }
1369
1370 pub fn enable_checkpointing(
1372 &mut self,
1373 store: std::sync::Arc<dyn crate::persistence::StateStore>,
1374 config: crate::persistence::CheckpointConfig,
1375 ) -> Result<(), crate::persistence::StoreError> {
1376 let manager = crate::persistence::CheckpointManager::new(store, config)?;
1377
1378 if let Some(cp) = manager.recover()? {
1379 if let Some(engine_cp) = cp.context_states.get("main") {
1380 self.restore_checkpoint(engine_cp)?;
1381 info!(
1382 "Auto-restored from checkpoint {} ({} events)",
1383 cp.id, cp.events_processed
1384 );
1385 }
1386 }
1387
1388 self.checkpoint_manager = Some(manager);
1389 Ok(())
1390 }
1391
1392 pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1394 let should = self
1395 .checkpoint_manager
1396 .as_ref()
1397 .is_some_and(|m| m.should_checkpoint());
1398
1399 if should {
1400 self.force_checkpoint()?;
1401 }
1402 Ok(())
1403 }
1404
1405 pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1407 if self.checkpoint_manager.is_none() {
1408 return Ok(());
1409 }
1410
1411 let engine_cp = self.create_checkpoint();
1412 let events_processed = self.events_processed;
1413
1414 let mut context_states = std::collections::HashMap::new();
1415 context_states.insert("main".to_string(), engine_cp);
1416
1417 let cp = crate::persistence::Checkpoint {
1418 id: 0,
1419 timestamp_ms: 0,
1420 events_processed,
1421 window_states: std::collections::HashMap::new(),
1422 pattern_states: std::collections::HashMap::new(),
1423 metadata: std::collections::HashMap::new(),
1424 context_states,
1425 };
1426
1427 self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1428 Ok(())
1429 }
1430
1431 pub const fn has_checkpointing(&self) -> bool {
1433 self.checkpoint_manager.is_some()
1434 }
1435
1436 pub fn enable_watermark_tracking(&mut self) {
1442 if self.watermark_tracker.is_none() {
1443 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1444 }
1445 }
1446
1447 pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1449 if let Some(ref mut tracker) = self.watermark_tracker {
1450 tracker.register_source(source, max_ooo);
1451 }
1452 }
1453
1454 #[tracing::instrument(skip(self))]
1456 pub async fn advance_external_watermark(
1457 &mut self,
1458 source_context: &str,
1459 watermark_ms: i64,
1460 ) -> Result<(), error::EngineError> {
1461 if let Some(ref mut tracker) = self.watermark_tracker {
1462 if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1463 tracker.advance_source_watermark(source_context, wm);
1464
1465 if let Some(new_wm) = tracker.effective_watermark() {
1466 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1467 self.apply_watermark_to_windows(new_wm).await?;
1468 self.last_applied_watermark = Some(new_wm);
1469 }
1470 }
1471 }
1472 }
1473 Ok(())
1474 }
1475
1476 fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1478 match (a, b) {
1479 (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1480 (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1481 (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1482 a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1483 }
1484 (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1485 a.len() == b.len()
1486 && a.iter()
1487 .zip(b.iter())
1488 .all(|(x, y)| x.event_type == y.event_type)
1489 }
1490 (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1491 _ => false,
1492 }
1493 }
1494}