1mod builder;
7mod compilation;
8pub mod compiler;
9mod dispatch;
10pub mod error;
11pub mod evaluator;
12mod pattern_analyzer;
13pub mod physical_plan;
14mod pipeline;
15pub mod planner;
16mod router;
17mod sink_factory;
18pub mod topology;
19pub mod topology_builder;
20mod types;
21
22use std::sync::Arc;
24
25pub use builder::EngineBuilder;
26use chrono::{DateTime, Duration, Utc};
27pub use evaluator::eval_filter_expr;
29use rustc_hash::{FxHashMap, FxHashSet};
30pub use sink_factory::SinkConnectorAdapter;
31use tokio::sync::mpsc;
32use tracing::{info, warn};
33pub use types::NamedPattern;
35pub use types::{EngineConfig, EngineMetrics, ReloadReport, SourceBinding, UserFunction};
36use types::{RuntimeOp, RuntimeSource, StreamDefinition, WindowType};
38use varpulis_core::ast::{ConfigItem, Program, Stmt};
39use varpulis_core::Value;
40
41use crate::connector;
42use crate::context::ContextMap;
43use crate::event::{Event, SharedEvent};
44use crate::metrics::Metrics;
45use crate::sase_persistence::SaseCheckpointExt;
46use crate::sequence::SequenceContext;
47use crate::udf::UdfRegistry;
48use crate::watermark::PerSourceWatermarkTracker;
49use crate::window::CountWindow;
50
51#[derive(Debug)]
53pub(super) enum OutputChannel {
54 Owned(mpsc::Sender<Event>),
56 Shared(mpsc::Sender<SharedEvent>),
58}
59
60pub struct Engine {
62 pub(super) streams: FxHashMap<String, StreamDefinition>,
64 pub(super) router: router::EventRouter,
66 pub(super) functions: FxHashMap<String, UserFunction>,
68 pub(super) patterns: FxHashMap<String, NamedPattern>,
70 pub(super) configs: FxHashMap<String, EngineConfig>,
72 pub(super) variables: FxHashMap<String, Value>,
74 pub(super) mutable_vars: FxHashSet<String>,
76 pub(super) connectors: FxHashMap<String, connector::ConnectorConfig>,
78 pub(super) source_bindings: Vec<SourceBinding>,
80 pub(super) sinks: sink_factory::SinkRegistry,
82 pub(super) output_channel: Option<OutputChannel>,
84 pub(super) events_processed: u64,
86 pub(super) output_events_emitted: u64,
87 pub(super) metrics: Option<Metrics>,
89 pub(super) context_map: ContextMap,
91 pub(super) watermark_tracker: Option<PerSourceWatermarkTracker>,
93 pub(super) last_applied_watermark: Option<DateTime<Utc>>,
95 pub(super) late_data_configs: FxHashMap<String, types::LateDataConfig>,
97 pub(super) context_name: Option<String>,
99 pub(super) topic_prefix: Option<String>,
101 pub(super) shared_hamlet_aggregators:
103 Vec<std::sync::Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
104 pub(super) checkpoint_manager: Option<crate::persistence::CheckpointManager>,
106 pub(super) credentials_store: Option<Arc<connector::credentials::CredentialsStore>>,
108 pub(super) dlq_path: Option<std::path::PathBuf>,
110 pub(super) dlq_config: crate::dead_letter::DlqConfig,
112 pub(super) dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
114 pub(super) physical_plan: Option<physical_plan::PhysicalPlan>,
116 pub(super) udf_registry: UdfRegistry,
118}
119
120impl std::fmt::Debug for Engine {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("Engine")
123 .field("streams", &self.streams.keys().collect::<Vec<_>>())
124 .field("functions", &self.functions.keys().collect::<Vec<_>>())
125 .field("patterns", &self.patterns.keys().collect::<Vec<_>>())
126 .field("configs", &self.configs.keys().collect::<Vec<_>>())
127 .field("connectors", &self.connectors.keys().collect::<Vec<_>>())
128 .field("events_processed", &self.events_processed)
129 .field("output_events_emitted", &self.output_events_emitted)
130 .field("context_map", &self.context_map)
131 .field("context_name", &self.context_name)
132 .field("topic_prefix", &self.topic_prefix)
133 .finish_non_exhaustive()
134 }
135}
136
137impl Engine {
138 pub fn builder() -> EngineBuilder {
152 EngineBuilder::new()
153 }
154
155 pub fn new(output_tx: mpsc::Sender<Event>) -> Self {
156 Self {
157 streams: FxHashMap::default(),
158 router: router::EventRouter::new(),
159 functions: FxHashMap::default(),
160 patterns: FxHashMap::default(),
161 configs: FxHashMap::default(),
162 variables: FxHashMap::default(),
163 mutable_vars: FxHashSet::default(),
164 connectors: FxHashMap::default(),
165 source_bindings: Vec::new(),
166 sinks: sink_factory::SinkRegistry::new(),
167 output_channel: Some(OutputChannel::Owned(output_tx)),
168 events_processed: 0,
169 output_events_emitted: 0,
170 metrics: None,
171 context_map: ContextMap::new(),
172 watermark_tracker: None,
173 last_applied_watermark: None,
174 late_data_configs: FxHashMap::default(),
175 context_name: None,
176 topic_prefix: None,
177 shared_hamlet_aggregators: Vec::new(),
178 checkpoint_manager: None,
179 credentials_store: None,
180 dlq_path: None,
181 dlq_config: crate::dead_letter::DlqConfig::default(),
182 dlq: None,
183 physical_plan: None,
184 udf_registry: UdfRegistry::new(),
185 }
186 }
187
188 pub fn new_benchmark() -> Self {
190 Self::new_internal(None)
191 }
192
193 pub fn new_with_optional_output(output_tx: Option<mpsc::Sender<Event>>) -> Self {
195 Self::new_internal(output_tx.map(OutputChannel::Owned))
196 }
197
198 pub fn new_shared(output_tx: mpsc::Sender<SharedEvent>) -> Self {
200 Self::new_internal(Some(OutputChannel::Shared(output_tx)))
201 }
202
203 pub fn set_credentials_store(&mut self, store: Arc<connector::credentials::CredentialsStore>) {
205 self.credentials_store = Some(store);
206 }
207
208 fn new_internal(output_channel: Option<OutputChannel>) -> Self {
210 Self {
211 streams: FxHashMap::default(),
212 router: router::EventRouter::new(),
213 functions: FxHashMap::default(),
214 patterns: FxHashMap::default(),
215 configs: FxHashMap::default(),
216 variables: FxHashMap::default(),
217 mutable_vars: FxHashSet::default(),
218 connectors: FxHashMap::default(),
219 source_bindings: Vec::new(),
220 sinks: sink_factory::SinkRegistry::new(),
221 output_channel,
222 events_processed: 0,
223 output_events_emitted: 0,
224 metrics: None,
225 context_map: ContextMap::new(),
226 watermark_tracker: None,
227 last_applied_watermark: None,
228 late_data_configs: FxHashMap::default(),
229 context_name: None,
230 topic_prefix: None,
231 shared_hamlet_aggregators: Vec::new(),
232 checkpoint_manager: None,
233 credentials_store: None,
234 dlq_path: None,
235 dlq_config: crate::dead_letter::DlqConfig::default(),
236 dlq: None,
237 physical_plan: None,
238 udf_registry: UdfRegistry::new(),
239 }
240 }
241
242 fn clone_output_channel(&self) -> Option<OutputChannel> {
244 match &self.output_channel {
245 Some(OutputChannel::Owned(tx)) => Some(OutputChannel::Owned(tx.clone())),
246 Some(OutputChannel::Shared(tx)) => Some(OutputChannel::Shared(tx.clone())),
247 None => None,
248 }
249 }
250
251 #[inline]
255 pub(super) fn send_output_shared(&mut self, event: &SharedEvent) {
256 match &self.output_channel {
257 Some(OutputChannel::Shared(tx)) => {
258 if let Err(e) = tx.try_send(Arc::clone(event)) {
260 warn!("Failed to send output event: {}", e);
261 }
262 }
263 Some(OutputChannel::Owned(tx)) => {
264 let owned = (**event).clone();
266 if let Err(e) = tx.try_send(owned) {
267 warn!("Failed to send output event: {}", e);
268 }
269 }
270 None => {
271 }
273 }
274 }
275
276 #[inline]
279 pub(super) fn send_output(&mut self, event: Event) {
280 match &self.output_channel {
281 Some(OutputChannel::Shared(tx)) => {
282 if let Err(e) = tx.try_send(Arc::new(event)) {
284 warn!("Failed to send output event: {}", e);
285 }
286 }
287 Some(OutputChannel::Owned(tx)) => {
288 if let Err(e) = tx.try_send(event) {
289 warn!("Failed to send output event: {}", e);
290 }
291 }
292 None => {
293 }
295 }
296 }
297
298 pub fn set_context_name(&mut self, name: &str) {
300 self.context_name = Some(name.to_string());
301 }
302
303 pub fn set_topic_prefix(&mut self, prefix: &str) {
308 self.topic_prefix = Some(prefix.to_string());
309 }
310
311 pub fn set_dlq_path(&mut self, path: std::path::PathBuf) {
313 self.dlq_path = Some(path);
314 }
315
316 pub const fn set_dlq_config(&mut self, config: crate::dead_letter::DlqConfig) {
318 self.dlq_config = config;
319 }
320
321 pub const fn dlq(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
323 self.dlq.as_ref()
324 }
325
326 pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern> {
328 self.patterns.get(name)
329 }
330
331 pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern> {
333 &self.patterns
334 }
335
336 pub fn get_config(&self, name: &str) -> Option<&EngineConfig> {
338 self.configs.get(name)
339 }
340
341 pub fn get_connector(&self, name: &str) -> Option<&connector::ConnectorConfig> {
343 self.connectors.get(name)
344 }
345
346 pub const fn connector_configs(&self) -> &FxHashMap<String, connector::ConnectorConfig> {
348 &self.connectors
349 }
350
351 pub fn source_bindings(&self) -> &[SourceBinding] {
353 &self.source_bindings
354 }
355
356 pub fn get_variable(&self, name: &str) -> Option<&Value> {
358 self.variables.get(name)
359 }
360
361 pub fn set_variable(&mut self, name: &str, value: Value) -> Result<(), error::EngineError> {
363 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
364 return Err(error::EngineError::Compilation(format!(
365 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let' to declare mutable variables."
366 )));
367 }
368 self.variables.insert(name.to_string(), value);
369 Ok(())
370 }
371
372 pub const fn variables(&self) -> &FxHashMap<String, Value> {
374 &self.variables
375 }
376
377 pub const fn context_map(&self) -> &ContextMap {
379 &self.context_map
380 }
381
382 pub fn has_contexts(&self) -> bool {
384 self.context_map.has_contexts()
385 }
386
387 pub fn with_metrics(mut self, metrics: Metrics) -> Self {
389 self.metrics = Some(metrics);
390 self
391 }
392
393 pub fn add_filter<F>(&mut self, stream_name: &str, filter: F) -> Result<(), error::EngineError>
395 where
396 F: Fn(&Event) -> bool + Send + Sync + 'static,
397 {
398 if let Some(stream) = self.streams.get_mut(stream_name) {
399 let wrapped = move |e: &SharedEvent| filter(e.as_ref());
400 stream
401 .operations
402 .insert(0, RuntimeOp::WhereClosure(Box::new(wrapped)));
403 Ok(())
404 } else {
405 Err(error::EngineError::StreamNotFound(stream_name.to_string()))
406 }
407 }
408
409 pub fn load_with_source(
411 &mut self,
412 source: &str,
413 program: &Program,
414 ) -> Result<(), error::EngineError> {
415 let validation = varpulis_core::validate::validate(source, program);
416 if validation.has_errors() {
417 return Err(error::EngineError::Compilation(validation.format(source)));
418 }
419 for warning in validation
420 .diagnostics
421 .iter()
422 .filter(|d| d.severity == varpulis_core::validate::Severity::Warning)
423 {
424 warn!("{}", warning.message);
425 }
426 self.load_program(program)
427 }
428
429 pub fn load(&mut self, program: &Program) -> Result<(), error::EngineError> {
431 self.load_program(program)
432 }
433
434 fn load_program(&mut self, program: &Program) -> Result<(), error::EngineError> {
435 for stmt in &program.statements {
436 match &stmt.node {
437 Stmt::StreamDecl {
438 name, source, ops, ..
439 } => {
440 self.register_stream(name, source, ops)?;
441 }
442 Stmt::EventDecl { name, fields, .. } => {
443 info!(
444 "Registered event type: {} with {} fields",
445 name,
446 fields.len()
447 );
448 }
449 Stmt::FnDecl {
450 name,
451 params,
452 ret,
453 body,
454 } => {
455 let user_fn = UserFunction {
456 name: name.clone(),
457 params: params
458 .iter()
459 .map(|p| (p.name.clone(), p.ty.clone()))
460 .collect(),
461 return_type: ret.clone(),
462 body: body.clone(),
463 };
464 info!(
465 "Registered function: {}({} params)",
466 name,
467 user_fn.params.len()
468 );
469 self.functions.insert(name.clone(), user_fn);
470 }
471 Stmt::Config { name, items } => {
472 warn!(
473 "DEPRECATED: 'config {}' block syntax is deprecated. \
474 Use 'connector' declarations instead: \
475 connector MyConn = {} (...)",
476 name, name
477 );
478 let mut values = std::collections::HashMap::new();
479 for item in items {
480 if let ConfigItem::Value(key, val) = item {
481 values.insert(key.clone(), val.clone());
482 }
483 }
484 info!(
485 "Registered config block: {} with {} items",
486 name,
487 values.len()
488 );
489 self.configs.insert(
490 name.clone(),
491 EngineConfig {
492 name: name.clone(),
493 values,
494 },
495 );
496 }
497 Stmt::PatternDecl {
498 name,
499 expr,
500 within,
501 partition_by,
502 } => {
503 let named_pattern = NamedPattern {
504 name: name.clone(),
505 expr: expr.clone(),
506 within: within.clone(),
507 partition_by: partition_by.clone(),
508 };
509 info!(
510 "Registered SASE+ pattern: {} (within: {}, partition: {})",
511 name,
512 within.is_some(),
513 partition_by.is_some()
514 );
515 self.patterns.insert(name.clone(), named_pattern);
516 }
517 Stmt::Import { path, alias } => {
518 warn!(
519 "Unresolved import '{}' (alias: {:?}) — imports must be resolved before engine.load()",
520 path, alias
521 );
522 }
523 Stmt::VarDecl {
524 mutable,
525 name,
526 value,
527 ..
528 } => {
529 let dummy_event = Event::new("__init__");
530 let empty_ctx = SequenceContext::new();
531 let initial_value = evaluator::eval_expr_with_functions(
532 value,
533 &dummy_event,
534 &empty_ctx,
535 &self.functions,
536 &self.variables,
537 )
538 .ok_or_else(|| {
539 error::EngineError::Compilation(format!(
540 "Failed to evaluate initial value for variable '{name}'"
541 ))
542 })?;
543
544 info!(
545 "Registered {} variable: {} = {:?}",
546 if *mutable { "mutable" } else { "immutable" },
547 name,
548 initial_value
549 );
550
551 self.variables.insert(name.clone(), initial_value);
552 if *mutable {
553 self.mutable_vars.insert(name.clone());
554 }
555 }
556 Stmt::Assignment { name, value } => {
557 let dummy_event = Event::new("__assign__");
558 let empty_ctx = SequenceContext::new();
559 let new_value = evaluator::eval_expr_with_functions(
560 value,
561 &dummy_event,
562 &empty_ctx,
563 &self.functions,
564 &self.variables,
565 )
566 .ok_or_else(|| {
567 error::EngineError::Compilation(format!(
568 "Failed to evaluate assignment value for '{name}'"
569 ))
570 })?;
571
572 if self.variables.contains_key(name) && !self.mutable_vars.contains(name) {
573 return Err(error::EngineError::Compilation(format!(
574 "Cannot assign to immutable variable '{name}'. Use 'var' instead of 'let'."
575 )));
576 }
577
578 if !self.variables.contains_key(name) {
579 self.mutable_vars.insert(name.clone());
580 }
581
582 info!("Assigned variable: {} = {:?}", name, new_value);
583 self.variables.insert(name.clone(), new_value);
584 }
585 Stmt::ContextDecl { name, cores } => {
586 use crate::context::ContextConfig;
587 info!("Registered context: {} (cores: {:?})", name, cores);
588 self.context_map.register_context(ContextConfig {
589 name: name.clone(),
590 cores: cores.clone(),
591 });
592 }
593 Stmt::ConnectorDecl {
594 name,
595 connector_type,
596 params,
597 } => {
598 let mut config =
599 sink_factory::connector_params_to_config(connector_type, params);
600
601 if let Some(profile_name) = config.properties.swap_remove("profile") {
603 if let Some(ref store) = self.credentials_store {
604 store
605 .merge_profile(&profile_name, &mut config)
606 .map_err(|e| {
607 error::EngineError::Compilation(format!(
608 "Failed to resolve credentials profile '{}' for connector '{}': {}",
609 profile_name, name, e
610 ))
611 })?;
612 info!(
613 "Registered connector: {} (type: {}, profile: {})",
614 name, connector_type, profile_name
615 );
616 } else {
617 return Err(error::EngineError::Compilation(format!(
618 "Connector '{}' references profile '{}' but no credentials file was provided. \
619 Use --credentials or set VARPULIS_CREDENTIALS.",
620 name, profile_name
621 )));
622 }
623 } else {
624 info!("Registered connector: {} (type: {})", name, connector_type);
625 }
626
627 self.connectors.insert(name.clone(), config);
628 }
629 _ => {
630 tracing::debug!("Skipping statement: {:?}", stmt.node);
631 }
632 }
633 }
634
635 let mut referenced_sink_keys: FxHashSet<String> = FxHashSet::default();
637 let mut topic_overrides: Vec<(String, String, String)> = Vec::new();
638 for stream in self.streams.values() {
639 for op in &stream.operations {
640 if let RuntimeOp::To(to_config) = op {
641 referenced_sink_keys.insert(to_config.sink_key.clone());
642 match &to_config.topic {
643 Some(types::TopicSpec::Static(topic)) => {
644 topic_overrides.push((
645 to_config.sink_key.clone(),
646 to_config.connector_name.clone(),
647 topic.clone(),
648 ));
649 }
650 Some(types::TopicSpec::Dynamic(_)) => {
651 referenced_sink_keys.insert(to_config.connector_name.clone());
653 }
654 None => {}
655 }
656 }
657 }
658 }
659
660 self.sinks.build_from_connectors(
662 &self.connectors,
663 &referenced_sink_keys,
664 &topic_overrides,
665 self.context_name.as_deref(),
666 self.topic_prefix.as_deref(),
667 );
668
669 if !self.sinks.cache().is_empty() {
671 let dlq_path = self
672 .dlq_path
673 .clone()
674 .unwrap_or_else(|| std::path::PathBuf::from("varpulis-dlq.jsonl"));
675 let dlq = crate::dead_letter::DeadLetterQueue::open_with_config(
676 &dlq_path,
677 self.dlq_config.clone(),
678 )
679 .map(Arc::new)
680 .ok();
681 if dlq.is_some() {
682 tracing::info!(
683 "Dead letter queue enabled at {} for {} sink(s)",
684 dlq_path.display(),
685 self.sinks.cache().len()
686 );
687 }
688 self.dlq = dlq.clone();
689 self.sinks.wrap_with_resilience(
690 crate::circuit_breaker::CircuitBreakerConfig::default(),
691 dlq,
692 self.metrics.clone(),
693 );
694 }
695
696 self.setup_hamlet_sharing();
698
699 let mut plan = physical_plan::PhysicalPlan::new();
701 let mut stream_event_types: FxHashMap<String, Vec<String>> = FxHashMap::default();
702 for (event_type, targets) in self.router.all_routes() {
703 for target in targets.iter() {
704 stream_event_types
705 .entry(target.clone())
706 .or_default()
707 .push(event_type.clone());
708 }
709 }
710 for (name, stream_def) in &self.streams {
711 let op_summary = stream_def
712 .operations
713 .iter()
714 .map(|op| op.summary_name())
715 .collect::<Vec<_>>()
716 .join(" → ");
717 plan.add_stream(physical_plan::PhysicalStream {
718 name: name.clone(),
719 operation_count: stream_def.operations.len(),
720 operation_summary: op_summary,
721 logical_id: plan.stream_count() as u32,
722 registered_event_types: stream_event_types
723 .remove(name.as_str())
724 .unwrap_or_default(),
725 });
726 }
727 self.physical_plan = Some(plan);
728
729 Ok(())
730 }
731
732 fn setup_hamlet_sharing(&mut self) {
735 use crate::hamlet::template::TemplateBuilder;
736 use crate::hamlet::{HamletAggregator, HamletConfig, QueryRegistration};
737
738 let hamlet_streams: Vec<String> = self
739 .streams
740 .iter()
741 .filter(|(_, s)| s.hamlet_aggregator.is_some())
742 .map(|(name, _)| name.clone())
743 .collect();
744
745 if hamlet_streams.len() < 2 {
746 return;
747 }
748
749 let mut kleene_groups: FxHashMap<Vec<String>, Vec<String>> = FxHashMap::default();
750
751 for stream_name in &hamlet_streams {
752 if let Some(stream) = self.streams.get(stream_name) {
753 let mut kleene_types = Vec::new();
754 for op in &stream.operations {
755 if let RuntimeOp::TrendAggregate(_) = op {
756 if let Some(ref agg) = stream.hamlet_aggregator {
757 let tmpl = agg.merged_template();
758 for query in agg.registered_queries() {
759 for &kt in &query.kleene_types {
760 let type_name =
761 tmpl.type_name(kt).unwrap_or("unknown").to_string();
762 if !kleene_types.contains(&type_name) {
763 kleene_types.push(type_name);
764 }
765 }
766 }
767 }
768 }
769 }
770 kleene_types.sort();
771 kleene_groups
772 .entry(kleene_types)
773 .or_default()
774 .push(stream_name.clone());
775 }
776 }
777
778 for (kleene_key, group_streams) in &kleene_groups {
779 if group_streams.len() < 2 || kleene_key.is_empty() {
780 continue;
781 }
782
783 info!(
784 "Hamlet sharing detected: {} streams share Kleene patterns {:?}: {:?}",
785 group_streams.len(),
786 kleene_key,
787 group_streams,
788 );
789
790 let mut builder = TemplateBuilder::new();
791 type SharingEntry = (
792 String,
793 QueryRegistration,
794 Vec<(String, crate::greta::GretaAggregate)>,
795 );
796 let mut all_registrations: Vec<SharingEntry> = Vec::new();
797 let mut next_query_id: crate::greta::QueryId = 0;
798
799 for stream_name in group_streams {
800 if let Some(stream) = self.streams.get(stream_name) {
801 if let Some(ref agg) = stream.hamlet_aggregator {
802 let tmpl = agg.merged_template();
803 for query in agg.registered_queries() {
804 let new_id = next_query_id;
805 next_query_id += 1;
806
807 let event_names: Vec<String> = query
809 .event_types
810 .iter()
811 .map(|&idx| tmpl.type_name(idx).unwrap_or("unknown").to_string())
812 .collect();
813 let name_strs: Vec<&str> =
814 event_names.iter().map(String::as_str).collect();
815
816 let base_state = builder.num_states() as u16;
818 builder.add_sequence(new_id, &name_strs);
819
820 for &kt in &query.kleene_types {
821 let type_name = tmpl.type_name(kt).unwrap_or("unknown").to_string();
822 let position = event_names
823 .iter()
824 .position(|n| *n == type_name)
825 .unwrap_or(0);
826 let kleene_state = base_state + position as u16 + 1;
829 builder.add_kleene(new_id, &type_name, kleene_state);
830 }
831
832 let fields: Vec<(String, crate::greta::GretaAggregate)> = stream
833 .operations
834 .iter()
835 .find_map(|op| {
836 if let RuntimeOp::TrendAggregate(config) = op {
837 Some(config.fields.clone())
838 } else {
839 None
840 }
841 })
842 .unwrap_or_default();
843
844 all_registrations.push((
845 stream_name.clone(),
846 QueryRegistration {
847 id: new_id,
848 event_types: query.event_types.clone(),
849 kleene_types: query.kleene_types.clone(),
850 aggregate: query.aggregate,
851 },
852 fields,
853 ));
854 }
855 }
856 }
857 }
858
859 let template = builder.build();
860 let config = HamletConfig {
861 window_ms: 60_000,
862 incremental: true,
863 ..Default::default()
864 };
865 let mut shared_agg = HamletAggregator::new(config, template);
866
867 for (_, registration, _) in &all_registrations {
868 shared_agg.register_query(registration.clone());
869 }
870
871 let shared_ref = std::sync::Arc::new(std::sync::Mutex::new(shared_agg));
872 self.shared_hamlet_aggregators.push(shared_ref.clone());
873
874 for (stream_name, registration, fields) in &all_registrations {
875 if let Some(stream) = self.streams.get_mut(stream_name) {
876 stream.hamlet_aggregator = None;
877 stream.shared_hamlet_ref = Some(shared_ref.clone());
878
879 for op in &mut stream.operations {
880 if let RuntimeOp::TrendAggregate(config) = op {
881 config.query_id = registration.id;
882 config.fields = fields.clone();
883 }
884 }
885 }
886 }
887
888 info!(
889 "Created shared Hamlet aggregator with {} queries",
890 next_query_id,
891 );
892 }
893 }
894
895 #[tracing::instrument(skip(self))]
897 pub async fn connect_sinks(&self) -> Result<(), error::EngineError> {
898 self.sinks
899 .connect_all()
900 .await
901 .map_err(error::EngineError::Pipeline)
902 }
903
904 pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn crate::sink::Sink>) {
906 self.sinks.insert(key.to_string(), sink);
907 }
908
909 pub fn has_sink(&self, key: &str) -> bool {
911 self.sinks.cache().contains_key(key)
912 }
913
914 pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String> {
916 let prefix = format!("{connector_name}::");
917 self.sinks
918 .cache()
919 .keys()
920 .filter(|k| *k == connector_name || k.starts_with(&prefix))
921 .cloned()
922 .collect()
923 }
924
925 pub fn has_sink_operations(&self) -> bool {
931 self.streams.values().any(|s| {
932 s.operations
933 .iter()
934 .any(|op| matches!(op, RuntimeOp::To(_) | RuntimeOp::Enrich(_)))
935 })
936 }
937
938 pub const fn event_counters(&self) -> (u64, u64) {
940 (self.events_processed, self.output_events_emitted)
941 }
942
943 pub fn is_stateless(&self) -> bool {
945 self.streams.values().all(|s| {
946 s.sase_engine.is_none()
947 && s.join_buffer.is_none()
948 && s.hamlet_aggregator.is_none()
949 && s.shared_hamlet_ref.is_none()
950 && s.operations.iter().all(|op| {
951 matches!(
952 op,
953 RuntimeOp::WhereExpr(_)
954 | RuntimeOp::WhereClosure(_)
955 | RuntimeOp::Select(_)
956 | RuntimeOp::Emit(_)
957 | RuntimeOp::EmitExpr(_)
958 | RuntimeOp::Print(_)
959 | RuntimeOp::Log(_)
960 | RuntimeOp::Having(_)
961 | RuntimeOp::Process(_)
962 | RuntimeOp::Pattern(_)
963 | RuntimeOp::To(_)
964 )
965 })
966 })
967 }
968
969 pub fn partition_key(&self) -> Option<String> {
971 for stream in self.streams.values() {
972 if let Some(ref sase) = stream.sase_engine {
973 if let Some(key) = sase.partition_by() {
974 return Some(key.to_string());
975 }
976 }
977 for op in &stream.operations {
978 match op {
979 RuntimeOp::PartitionedWindow(pw) => return Some(pw.partition_key.clone()),
980 RuntimeOp::PartitionedSlidingCountWindow(pw) => {
981 return Some(pw.partition_key.clone())
982 }
983 RuntimeOp::PartitionedAggregate(pa) => return Some(pa.partition_key.clone()),
984 _ => {}
985 }
986 }
987 if stream.sase_engine.is_some() {
988 for op in &stream.operations {
989 if let RuntimeOp::WhereExpr(expr) = op {
990 if let Some(key) = compilation::extract_equality_join_key(expr) {
991 return Some(key);
992 }
993 }
994 }
995 }
996 }
997 None
998 }
999
1000 pub fn has_session_windows(&self) -> bool {
1002 self.streams.values().any(|s| {
1003 s.operations.iter().any(|op| {
1004 matches!(
1005 op,
1006 RuntimeOp::Window(WindowType::Session(_))
1007 | RuntimeOp::Window(WindowType::PartitionedSession(_))
1008 )
1009 })
1010 })
1011 }
1012
1013 pub fn min_session_gap(&self) -> Option<chrono::Duration> {
1015 let mut min_gap: Option<chrono::Duration> = None;
1016 for stream in self.streams.values() {
1017 for op in &stream.operations {
1018 if let RuntimeOp::Window(window) = op {
1019 let gap = match window {
1020 WindowType::Session(w) => Some(w.gap()),
1021 WindowType::PartitionedSession(w) => Some(w.gap()),
1022 _ => None,
1023 };
1024 if let Some(g) = gap {
1025 min_gap = Some(match min_gap {
1026 Some(current) if g < current => g,
1027 Some(current) => current,
1028 None => g,
1029 });
1030 }
1031 }
1032 }
1033 }
1034 min_gap
1035 }
1036
1037 pub fn metrics(&self) -> EngineMetrics {
1039 EngineMetrics {
1040 events_processed: self.events_processed,
1041 output_events_emitted: self.output_events_emitted,
1042 streams_count: self.streams.len(),
1043 }
1044 }
1045
1046 pub fn stream_names(&self) -> Vec<&str> {
1048 self.streams.keys().map(|s| s.as_str()).collect()
1049 }
1050
1051 pub fn physical_plan_summary(&self) -> Option<String> {
1053 self.physical_plan.as_ref().map(|p| p.summary())
1054 }
1055
1056 pub fn explain(&self, program: &Program) -> Result<String, error::EngineError> {
1057 let logical = planner::logical_plan(program).map_err(error::EngineError::Compilation)?;
1058 let optimized = varpulis_parser::optimize_plan(logical);
1059 Ok(optimized.explain())
1060 }
1061
1062 pub fn topology(&self) -> topology::Topology {
1064 let mut builder = topology_builder::TopologyBuilder::new();
1065 for (name, stream_def) in &self.streams {
1066 builder = builder.add_stream(name, stream_def);
1067 }
1068 builder = builder.add_routes(self.router.all_routes());
1069 builder.build()
1070 }
1071
1072 pub fn get_function(&self, name: &str) -> Option<&UserFunction> {
1074 self.functions.get(name)
1075 }
1076
1077 pub fn function_names(&self) -> Vec<&str> {
1079 self.functions.keys().map(|s| s.as_str()).collect()
1080 }
1081
1082 pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)> {
1084 let mut timers = Vec::new();
1085 for stream in self.streams.values() {
1086 if let RuntimeSource::Timer(config) = &stream.source {
1087 timers.push((
1088 config.interval_ns,
1089 config.initial_delay_ns,
1090 config.timer_event_type.clone(),
1091 ));
1092 }
1093 }
1094 timers
1095 }
1096
1097 pub fn register_scalar_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::ScalarUDF>) {
1099 self.udf_registry.register_scalar(udf);
1100 }
1101
1102 pub fn register_aggregate_udf(&mut self, udf: std::sync::Arc<dyn crate::udf::AggregateUDF>) {
1104 self.udf_registry.register_aggregate(udf);
1105 }
1106
1107 pub const fn udf_registry(&self) -> &UdfRegistry {
1109 &self.udf_registry
1110 }
1111
1112 pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, error::EngineError> {
1118 let mut report = ReloadReport::default();
1119
1120 let old_streams: FxHashSet<String> = self.streams.keys().cloned().collect();
1121
1122 let mut new_engine = Self::new_internal(self.clone_output_channel());
1123 new_engine.credentials_store = self.credentials_store.clone();
1124 new_engine.load(program)?;
1125
1126 let new_streams: FxHashSet<String> = new_engine.streams.keys().cloned().collect();
1127
1128 for name in new_streams.difference(&old_streams) {
1129 report.streams_added.push(name.clone());
1130 }
1131
1132 for name in old_streams.difference(&new_streams) {
1133 report.streams_removed.push(name.clone());
1134 }
1135
1136 for name in old_streams.intersection(&new_streams) {
1137 let old_stream = self.streams.get(name).unwrap();
1138 let new_stream = new_engine.streams.get(name).unwrap();
1139
1140 let source_changed = !Self::sources_compatible(&old_stream.source, &new_stream.source);
1141 let ops_changed = old_stream.operations.len() != new_stream.operations.len();
1142
1143 if source_changed || ops_changed {
1144 report.streams_updated.push(name.clone());
1145 report.state_reset.push(name.clone());
1146 } else {
1147 report.state_preserved.push(name.clone());
1148 }
1149 }
1150
1151 for name in &report.streams_removed {
1152 self.streams.remove(name);
1153 }
1154
1155 self.router.clear();
1156
1157 for name in &report.streams_added {
1158 if let Some(stream) = new_engine.streams.remove(name) {
1159 self.streams.insert(name.clone(), stream);
1160 }
1161 }
1162
1163 for name in &report.streams_updated {
1164 if let Some(stream) = new_engine.streams.remove(name) {
1165 self.streams.insert(name.clone(), stream);
1166 }
1167 }
1168
1169 let registrations: Vec<(String, String)> = self
1170 .streams
1171 .iter()
1172 .flat_map(|(name, stream)| {
1173 let mut pairs = Vec::new();
1174 match &stream.source {
1175 RuntimeSource::EventType(et) => {
1176 pairs.push((et.clone(), name.clone()));
1177 }
1178 RuntimeSource::Stream(s) => {
1179 pairs.push((s.clone(), name.clone()));
1180 }
1181 RuntimeSource::Merge(sources) => {
1182 for ms in sources {
1183 pairs.push((ms.event_type.clone(), name.clone()));
1184 }
1185 }
1186 RuntimeSource::Join(_) => {}
1187 RuntimeSource::Timer(config) => {
1188 pairs.push((config.timer_event_type.clone(), name.clone()));
1189 }
1190 }
1191 pairs
1192 })
1193 .collect();
1194
1195 for (event_type, stream_name) in registrations {
1196 self.router.add_route(&event_type, &stream_name);
1197 }
1198
1199 self.functions = new_engine.functions;
1200 self.patterns = new_engine.patterns;
1201 self.configs = new_engine.configs;
1202 self.context_map = new_engine.context_map;
1203 self.connectors = new_engine.connectors;
1204 self.source_bindings = new_engine.source_bindings;
1205 *self.sinks.cache_mut() = std::mem::take(new_engine.sinks.cache_mut());
1206
1207 for (name, value) in new_engine.variables {
1208 if !self.variables.contains_key(&name) {
1209 self.variables.insert(name.clone(), value);
1210 self.mutable_vars
1211 .extend(new_engine.mutable_vars.iter().cloned());
1212 }
1213 }
1214
1215 info!(
1216 "Hot reload complete: +{} -{} ~{} streams",
1217 report.streams_added.len(),
1218 report.streams_removed.len(),
1219 report.streams_updated.len()
1220 );
1221
1222 Ok(report)
1223 }
1224
1225 pub fn create_checkpoint(&self) -> crate::persistence::EngineCheckpoint {
1231 use crate::persistence::{EngineCheckpoint, WindowCheckpoint};
1232
1233 let mut window_states = std::collections::HashMap::new();
1234 let mut sase_states = std::collections::HashMap::new();
1235 let mut join_states = std::collections::HashMap::new();
1236 let mut distinct_states = std::collections::HashMap::new();
1237 let mut limit_states = std::collections::HashMap::new();
1238
1239 for (name, stream) in &self.streams {
1240 for op in &stream.operations {
1241 match op {
1242 RuntimeOp::Window(wt) => {
1243 let cp = match wt {
1244 WindowType::Tumbling(w) => w.checkpoint(),
1245 WindowType::Sliding(w) => w.checkpoint(),
1246 WindowType::Count(w) => w.checkpoint(),
1247 WindowType::SlidingCount(w) => w.checkpoint(),
1248 WindowType::Session(w) => w.checkpoint(),
1249 WindowType::PartitionedSession(w) => w.checkpoint(),
1250 WindowType::PartitionedTumbling(w) => w.checkpoint(),
1251 WindowType::PartitionedSliding(w) => w.checkpoint(),
1252 };
1253 window_states.insert(name.clone(), cp);
1254 }
1255 RuntimeOp::PartitionedWindow(pw) => {
1256 let mut partitions = std::collections::HashMap::new();
1257 for (key, cw) in &pw.windows {
1258 let sub_cp = cw.checkpoint();
1259 partitions.insert(
1260 key.clone(),
1261 crate::persistence::PartitionedWindowCheckpoint {
1262 events: sub_cp.events,
1263 window_start_ms: sub_cp.window_start_ms,
1264 },
1265 );
1266 }
1267 window_states.insert(
1268 name.clone(),
1269 WindowCheckpoint {
1270 events: Vec::new(),
1271 window_start_ms: None,
1272 last_emit_ms: None,
1273 partitions,
1274 },
1275 );
1276 }
1277 RuntimeOp::Distinct(state) => {
1278 let keys: Vec<String> =
1279 state.seen.iter().rev().map(|(k, ())| k.clone()).collect();
1280 distinct_states.insert(
1281 name.clone(),
1282 crate::persistence::DistinctCheckpoint { keys },
1283 );
1284 }
1285 RuntimeOp::Limit(state) => {
1286 limit_states.insert(
1287 name.clone(),
1288 crate::persistence::LimitCheckpoint {
1289 max: state.max,
1290 count: state.count,
1291 },
1292 );
1293 }
1294 _ => {}
1295 }
1296 }
1297
1298 if let Some(ref sase) = stream.sase_engine {
1299 sase_states.insert(name.clone(), sase.checkpoint());
1300 }
1301
1302 if let Some(ref jb) = stream.join_buffer {
1303 join_states.insert(name.clone(), jb.checkpoint());
1304 }
1305 }
1306
1307 let variables = self
1308 .variables
1309 .iter()
1310 .map(|(k, v)| (k.clone(), crate::persistence::value_to_ser(v)))
1311 .collect();
1312
1313 let watermark_state = self.watermark_tracker.as_ref().map(|t| t.checkpoint());
1314
1315 EngineCheckpoint {
1316 version: crate::persistence::CHECKPOINT_VERSION,
1317 window_states,
1318 sase_states,
1319 join_states,
1320 variables,
1321 events_processed: self.events_processed,
1322 output_events_emitted: self.output_events_emitted,
1323 watermark_state,
1324 distinct_states,
1325 limit_states,
1326 }
1327 }
1328
1329 pub fn restore_checkpoint(
1331 &mut self,
1332 cp: &crate::persistence::EngineCheckpoint,
1333 ) -> Result<(), crate::persistence::StoreError> {
1334 let mut migrated = cp.clone();
1335 migrated.validate_and_migrate()?;
1336
1337 self.events_processed = cp.events_processed;
1338 self.output_events_emitted = cp.output_events_emitted;
1339
1340 for (k, sv) in &cp.variables {
1341 self.variables
1342 .insert(k.clone(), crate::persistence::ser_to_value(sv.clone()));
1343 }
1344
1345 for (name, stream) in &mut self.streams {
1346 if let Some(wcp) = cp.window_states.get(name) {
1347 for op in &mut stream.operations {
1348 match op {
1349 RuntimeOp::Window(wt) => match wt {
1350 WindowType::Tumbling(w) => w.restore(wcp),
1351 WindowType::Sliding(w) => w.restore(wcp),
1352 WindowType::Count(w) => w.restore(wcp),
1353 WindowType::SlidingCount(w) => w.restore(wcp),
1354 WindowType::Session(w) => w.restore(wcp),
1355 WindowType::PartitionedSession(w) => w.restore(wcp),
1356 WindowType::PartitionedTumbling(w) => w.restore(wcp),
1357 WindowType::PartitionedSliding(w) => w.restore(wcp),
1358 },
1359 RuntimeOp::PartitionedWindow(pw) => {
1360 for (key, pcp) in &wcp.partitions {
1361 let sub_wcp = crate::persistence::WindowCheckpoint {
1362 events: pcp.events.clone(),
1363 window_start_ms: pcp.window_start_ms,
1364 last_emit_ms: None,
1365 partitions: std::collections::HashMap::new(),
1366 };
1367 let window = pw
1368 .windows
1369 .entry(key.clone())
1370 .or_insert_with(|| CountWindow::new(pw.window_size));
1371 window.restore(&sub_wcp);
1372 }
1373 }
1374 _ => {}
1375 }
1376 }
1377 }
1378
1379 if let Some(scp) = cp.sase_states.get(name) {
1380 if let Some(ref mut sase) = stream.sase_engine {
1381 sase.restore(scp);
1382 }
1383 }
1384
1385 if let Some(jcp) = cp.join_states.get(name) {
1386 if let Some(ref mut jb) = stream.join_buffer {
1387 jb.restore(jcp);
1388 }
1389 }
1390
1391 if let Some(dcp) = cp.distinct_states.get(name) {
1392 for op in &mut stream.operations {
1393 if let RuntimeOp::Distinct(state) = op {
1394 state.seen.clear();
1395 for key in dcp.keys.iter().rev() {
1396 state.seen.insert(key.clone(), ());
1397 }
1398 }
1399 }
1400 }
1401
1402 if let Some(lcp) = cp.limit_states.get(name) {
1403 for op in &mut stream.operations {
1404 if let RuntimeOp::Limit(state) = op {
1405 state.max = lcp.max;
1406 state.count = lcp.count;
1407 }
1408 }
1409 }
1410 }
1411
1412 if let Some(ref wcp) = cp.watermark_state {
1413 if self.watermark_tracker.is_none() {
1414 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1415 }
1416 if let Some(ref mut tracker) = self.watermark_tracker {
1417 tracker.restore(wcp);
1418 self.last_applied_watermark = wcp
1419 .effective_watermark_ms
1420 .and_then(DateTime::from_timestamp_millis);
1421 }
1422 }
1423
1424 info!(
1425 "Engine restored: {} events processed, {} streams with state (schema v{})",
1426 cp.events_processed,
1427 cp.window_states.len() + cp.sase_states.len() + cp.join_states.len(),
1428 cp.version
1429 );
1430
1431 Ok(())
1432 }
1433
1434 pub fn enable_checkpointing(
1436 &mut self,
1437 store: std::sync::Arc<dyn crate::persistence::StateStore>,
1438 config: crate::persistence::CheckpointConfig,
1439 ) -> Result<(), crate::persistence::StoreError> {
1440 let manager = crate::persistence::CheckpointManager::new(store, config)?;
1441
1442 if let Some(cp) = manager.recover()? {
1443 if let Some(engine_cp) = cp.context_states.get("main") {
1444 self.restore_checkpoint(engine_cp)?;
1445 info!(
1446 "Auto-restored from checkpoint {} ({} events)",
1447 cp.id, cp.events_processed
1448 );
1449 }
1450 }
1451
1452 self.checkpoint_manager = Some(manager);
1453 Ok(())
1454 }
1455
1456 pub fn checkpoint_tick(&mut self) -> Result<(), crate::persistence::StoreError> {
1458 let should = self
1459 .checkpoint_manager
1460 .as_ref()
1461 .is_some_and(|m| m.should_checkpoint());
1462
1463 if should {
1464 self.force_checkpoint()?;
1465 }
1466 Ok(())
1467 }
1468
1469 pub fn force_checkpoint(&mut self) -> Result<(), crate::persistence::StoreError> {
1471 if self.checkpoint_manager.is_none() {
1472 return Ok(());
1473 }
1474
1475 let engine_cp = self.create_checkpoint();
1476 let events_processed = self.events_processed;
1477
1478 let mut context_states = std::collections::HashMap::new();
1479 context_states.insert("main".to_string(), engine_cp);
1480
1481 let cp = crate::persistence::Checkpoint {
1482 id: 0,
1483 timestamp_ms: 0,
1484 events_processed,
1485 window_states: std::collections::HashMap::new(),
1486 pattern_states: std::collections::HashMap::new(),
1487 metadata: std::collections::HashMap::new(),
1488 context_states,
1489 };
1490
1491 self.checkpoint_manager.as_mut().unwrap().checkpoint(cp)?;
1492 Ok(())
1493 }
1494
1495 pub const fn has_checkpointing(&self) -> bool {
1497 self.checkpoint_manager.is_some()
1498 }
1499
1500 pub fn enable_watermark_tracking(&mut self) {
1506 if self.watermark_tracker.is_none() {
1507 self.watermark_tracker = Some(PerSourceWatermarkTracker::new());
1508 }
1509 }
1510
1511 pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration) {
1513 if let Some(ref mut tracker) = self.watermark_tracker {
1514 tracker.register_source(source, max_ooo);
1515 }
1516 }
1517
1518 #[tracing::instrument(skip(self))]
1520 pub async fn advance_external_watermark(
1521 &mut self,
1522 source_context: &str,
1523 watermark_ms: i64,
1524 ) -> Result<(), error::EngineError> {
1525 if let Some(ref mut tracker) = self.watermark_tracker {
1526 if let Some(wm) = DateTime::from_timestamp_millis(watermark_ms) {
1527 tracker.advance_source_watermark(source_context, wm);
1528
1529 if let Some(new_wm) = tracker.effective_watermark() {
1530 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
1531 self.apply_watermark_to_windows(new_wm).await?;
1532 self.last_applied_watermark = Some(new_wm);
1533 }
1534 }
1535 }
1536 }
1537 Ok(())
1538 }
1539
1540 fn sources_compatible(a: &RuntimeSource, b: &RuntimeSource) -> bool {
1542 match (a, b) {
1543 (RuntimeSource::EventType(a), RuntimeSource::EventType(b)) => a == b,
1544 (RuntimeSource::Stream(a), RuntimeSource::Stream(b)) => a == b,
1545 (RuntimeSource::Timer(a), RuntimeSource::Timer(b)) => {
1546 a.interval_ns == b.interval_ns && a.timer_event_type == b.timer_event_type
1547 }
1548 (RuntimeSource::Merge(a), RuntimeSource::Merge(b)) => {
1549 a.len() == b.len()
1550 && a.iter()
1551 .zip(b.iter())
1552 .all(|(x, y)| x.event_type == y.event_type)
1553 }
1554 (RuntimeSource::Join(a), RuntimeSource::Join(b)) => a == b,
1555 _ => false,
1556 }
1557 }
1558}