1use std::collections::HashMap;
6use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::FxHashMap;
10use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
11use varpulis_core::Value;
12
13use crate::aggregation::Aggregator;
14use crate::event::SharedEvent;
15use crate::join::JoinBuffer;
16use crate::sase::SaseEngine;
17use crate::window::{
18 CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
19 SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
20};
21
22#[derive(Debug, Clone)]
28pub struct EngineConfig {
29 pub name: String,
30 pub values: HashMap<String, ConfigValue>,
31}
32
33#[derive(Debug, Clone)]
35pub struct UserFunction {
36 pub name: String,
37 pub params: Vec<(String, varpulis_core::Type)>, pub return_type: Option<varpulis_core::Type>,
39 pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
40}
41
42#[derive(Debug, Clone)]
44pub struct EngineMetrics {
45 pub events_processed: u64,
46 pub output_events_emitted: u64,
47 pub streams_count: usize,
48}
49
50#[derive(Debug, Clone, Default)]
52pub struct ReloadReport {
53 pub streams_added: Vec<String>,
55 pub streams_removed: Vec<String>,
57 pub streams_updated: Vec<String>,
59 pub state_preserved: Vec<String>,
61 pub state_reset: Vec<String>,
63 pub warnings: Vec<String>,
65}
66
67impl ReloadReport {
68 pub const fn is_empty(&self) -> bool {
69 self.streams_added.is_empty()
70 && self.streams_removed.is_empty()
71 && self.streams_updated.is_empty()
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct SourceBinding {
78 pub connector_name: String,
79 pub event_type: String,
80 pub topic_override: Option<String>,
81 pub extra_params: HashMap<String, String>,
83}
84
85#[derive(Debug, Clone)]
87pub struct NamedPattern {
88 pub name: String,
90 pub expr: SasePatternExpr,
92 pub within: Option<Expr>,
94 pub partition_by: Option<Expr>,
96}
97
98#[allow(dead_code)]
104pub struct StreamDefinition {
105 pub name: String,
106 pub name_arc: Arc<str>,
109 pub source: RuntimeSource,
110 pub operations: Vec<RuntimeOp>,
111 pub sase_engine: Option<SaseEngine>,
113 pub join_buffer: Option<JoinBuffer>,
115 pub event_type_to_source: FxHashMap<String, String>,
117 pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
119 pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
121 pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
123 pub last_raw_event: Option<crate::event::SharedEvent>,
126 #[cfg(feature = "async-runtime")]
128 pub enrichment: Option<(
129 Arc<dyn crate::enrichment::EnrichmentProvider>,
130 Arc<crate::enrichment::EnrichmentCache>,
131 )>,
132 #[cfg(feature = "async-runtime")]
134 #[allow(dead_code)]
135 pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
136}
137
138pub enum RuntimeSource {
140 EventType(String),
141 Stream(String),
142 Join(Vec<String>),
143 Merge(Vec<MergeSource>),
145 Timer(TimerConfig),
147}
148
149impl RuntimeSource {
150 pub fn describe(&self) -> String {
152 match self {
153 Self::EventType(t) => format!("event:{t}"),
154 Self::Stream(s) => format!("stream:{s}"),
155 Self::Join(sources) => format!("join:{}", sources.join(",")),
156 Self::Merge(sources) => format!("merge:{} sources", sources.len()),
157 Self::Timer(config) => {
158 format!("timer:{}ms", config.interval_ns / 1_000_000)
159 }
160 }
161 }
162}
163
164pub struct TimerConfig {
166 pub interval_ns: u64,
168 pub initial_delay_ns: Option<u64>,
170 pub timer_event_type: String,
172}
173
174#[allow(dead_code)]
176pub struct MergeSource {
177 pub name: String,
178 pub event_type: String,
179 pub filter: Option<varpulis_core::ast::Expr>,
180}
181
182#[allow(dead_code)]
184pub enum RuntimeOp {
185 WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
187 WhereExpr(varpulis_core::ast::Expr),
189 Window(WindowType),
191 PartitionedWindow(PartitionedWindowState),
193 PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
195 Aggregate(Aggregator),
196 PartitionedAggregate(PartitionedAggregatorState),
198 Having(varpulis_core::ast::Expr),
200 Select(SelectConfig),
202 Emit(EmitConfig),
203 EmitExpr(EmitExprConfig),
205 Print(PrintConfig),
207 Log(LogConfig),
209 Sequence,
211 Pattern(PatternConfig),
213 Process(varpulis_core::ast::Expr),
215 To(ToConfig),
217 TrendAggregate(TrendAggregateConfig),
219 #[allow(dead_code)]
221 Score(ScoreConfig),
222 Forecast(ForecastConfig),
224 Enrich(EnrichConfig),
226 Alert(AlertConfig),
228 Distinct(DistinctState),
230 Limit(LimitState),
232 #[cfg(feature = "async-runtime")]
234 Concurrent(ConcurrentConfig),
235}
236
237impl RuntimeOp {
238 pub const fn summary_name(&self) -> &'static str {
242 match self {
243 Self::WhereClosure(_) => "Filter",
244 Self::WhereExpr(_) => "Filter",
245 Self::Window(_) => "Window",
246 Self::PartitionedWindow(_) => "PartitionedWindow",
247 Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
248 Self::Aggregate(_) => "Aggregate",
249 Self::PartitionedAggregate(_) => "PartitionedAggregate",
250 Self::Having(_) => "Having",
251 Self::Select(_) => "Select",
252 Self::Emit(_) => "Emit",
253 Self::EmitExpr(_) => "EmitExpr",
254 Self::Print(_) => "Print",
255 Self::Log(_) => "Log",
256 Self::Sequence => "Sequence",
257 Self::Pattern(_) => "Pattern",
258 Self::Process(_) => "Process",
259 Self::To(_) => "Sink",
260 Self::TrendAggregate(_) => "TrendAggregate",
261 Self::Score(_) => "Score",
262 Self::Forecast(_) => "Forecast",
263 Self::Enrich(_) => "Enrich",
264 Self::Alert(_) => "Alert",
265 Self::Distinct(_) => "Distinct",
266 Self::Limit(_) => "Limit",
267 #[cfg(feature = "async-runtime")]
268 Self::Concurrent(_) => "Concurrent",
269 }
270 }
271}
272
273#[cfg(feature = "async-runtime")]
279pub struct ConcurrentConfig {
280 pub workers: usize,
281 pub partition_key: Option<String>,
282 pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
283}
284
285pub struct TrendAggregateConfig {
287 pub fields: Vec<(String, crate::greta::GretaAggregate)>,
289 pub query_id: crate::greta::QueryId,
291 pub field_aggregates: Vec<FieldAggregateInfo>,
293 pub type_index_to_name: Vec<String>,
295 pub accumulated: Vec<SharedEvent>,
297}
298
299pub struct FieldAggregateInfo {
302 pub output_alias: String,
304 pub func: String,
306 pub event_type: String,
308 pub field_name: String,
310}
311
312#[allow(dead_code)]
314pub struct ForecastConfig {
315 pub confidence_threshold: f64,
317 pub horizon_ns: u64,
319 pub warmup_events: u64,
321 pub max_depth: usize,
323 pub hawkes: bool,
325 pub conformal: bool,
327}
328
329pub struct AlertConfig {
331 pub webhook_url: Option<String>,
333 pub message_template: Option<String>,
335}
336
337#[allow(dead_code)]
339pub struct EnrichConfig {
340 pub connector_name: String,
342 pub key_expr: varpulis_core::ast::Expr,
344 pub fields: Vec<String>,
346 pub cache_ttl_ns: Option<u64>,
348 pub timeout_ns: u64,
350 pub fallback: Option<varpulis_core::Value>,
352}
353
354#[allow(dead_code)]
356pub struct ScoreConfig {
357 #[cfg(feature = "onnx")]
358 pub model: std::sync::Arc<crate::scoring::OnnxModel>,
359 pub input_fields: Vec<String>,
360 pub output_fields: Vec<String>,
361 pub batch_size: usize,
362}
363
364#[allow(dead_code)]
366pub enum TopicSpec {
367 Static(String),
369 Dynamic(varpulis_core::ast::Expr),
371}
372
373#[allow(dead_code)]
375pub struct ToConfig {
376 pub connector_name: String,
377 pub topic: Option<TopicSpec>,
379 pub sink_key: String,
381 #[allow(dead_code)]
383 pub extra_params: HashMap<String, String>,
384}
385
386pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
388
389pub struct DistinctState {
391 pub expr: Option<varpulis_core::ast::Expr>,
393 pub seen: hashlink::LruCache<String, ()>,
395}
396
397pub struct LimitState {
399 pub max: usize,
400 pub count: usize,
401}
402
403pub struct SelectConfig {
405 pub fields: Vec<(String, varpulis_core::ast::Expr)>,
407}
408
409pub struct StreamProcessResult {
411 pub emitted_events: Vec<SharedEvent>,
413 pub output_events: Vec<SharedEvent>,
415 pub sink_events_sent: u64,
417}
418
419pub struct PartitionedWindowState {
421 pub partition_key: String,
422 pub window_size: usize, pub windows: FxHashMap<String, CountWindow>,
424}
425
426impl PartitionedWindowState {
427 pub fn new(partition_key: String, window_size: usize) -> Self {
428 Self {
429 partition_key,
430 window_size,
431 windows: FxHashMap::default(),
432 }
433 }
434
435 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
436 let key = event.get(&self.partition_key).map_or_else(
437 || "default".to_string(),
438 |v| v.to_partition_key().into_owned(),
439 );
440
441 let window = self
442 .windows
443 .entry(key)
444 .or_insert_with(|| CountWindow::new(self.window_size));
445
446 window.add_shared(event)
447 }
448}
449
450pub struct PartitionedSlidingCountWindowState {
452 pub partition_key: String,
453 pub window_size: usize,
454 pub slide_size: usize,
455 pub windows: FxHashMap<String, SlidingCountWindow>,
456}
457
458impl PartitionedSlidingCountWindowState {
459 pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
460 Self {
461 partition_key,
462 window_size,
463 slide_size,
464 windows: FxHashMap::default(),
465 }
466 }
467
468 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
469 let key = event.get(&self.partition_key).map_or_else(
470 || "default".to_string(),
471 |v| v.to_partition_key().into_owned(),
472 );
473
474 let window = self
475 .windows
476 .entry(key)
477 .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
478
479 window.add_shared(event)
480 }
481}
482
483pub struct PartitionedAggregatorState {
485 pub partition_key: String,
486 pub aggregator_template: Aggregator,
487}
488
489impl PartitionedAggregatorState {
490 pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
491 Self {
492 partition_key,
493 aggregator_template: aggregator,
494 }
495 }
496
497 pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
498 let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
500
501 for event in events {
502 let key = event.get(&self.partition_key).map_or_else(
503 || "default".to_string(),
504 |v| v.to_partition_key().into_owned(),
505 );
506 partitions.entry(key).or_default().push(Arc::clone(event));
507 }
508
509 let mut results = Vec::new();
511 for (key, partition_events) in partitions {
512 let result = self.aggregator_template.apply_shared(&partition_events);
513 results.push((key, result));
514 }
515
516 results
517 }
518}
519
520#[allow(dead_code)]
522pub struct PatternConfig {
523 pub name: String,
524 pub matcher: varpulis_core::ast::Expr,
525}
526
527pub struct PrintConfig {
529 pub exprs: Vec<varpulis_core::ast::Expr>,
530}
531
532pub struct LogConfig {
534 pub level: String,
535 pub message: Option<String>,
536 pub data_field: Option<String>,
537}
538
539pub enum WindowType {
541 Tumbling(TumblingWindow),
542 Sliding(SlidingWindow),
543 Count(CountWindow),
544 SlidingCount(SlidingCountWindow),
545 PartitionedTumbling(PartitionedTumblingWindow),
546 PartitionedSliding(PartitionedSlidingWindow),
547 Session(SessionWindow),
548 PartitionedSession(PartitionedSessionWindow),
549}
550
551#[allow(dead_code)]
553pub struct EmitConfig {
554 pub fields: Vec<(String, String)>, pub target_context: Option<String>,
556}
557
558#[allow(dead_code)]
560pub struct EmitExprConfig {
561 pub fields: Vec<(String, varpulis_core::ast::Expr)>, pub target_context: Option<String>,
563}
564
565#[allow(dead_code)]
567pub struct LateDataConfig {
568 pub allowed_lateness: chrono::Duration,
571 pub side_output_stream: Option<String>,
574}