1use crate::aggregation::Aggregator;
6use crate::event::SharedEvent;
7use crate::join::JoinBuffer;
8use crate::sase::SaseEngine;
9use crate::window::{
10 CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
11 SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
12};
13use indexmap::IndexMap;
14use rustc_hash::FxHashMap;
15use std::collections::HashMap;
16use std::sync::Arc;
17use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
18use varpulis_core::Value;
19
20#[derive(Debug, Clone)]
26pub struct EngineConfig {
27 pub name: String,
28 pub values: HashMap<String, ConfigValue>,
29}
30
31#[derive(Debug, Clone)]
33pub struct UserFunction {
34 pub name: String,
35 pub params: Vec<(String, varpulis_core::Type)>, pub return_type: Option<varpulis_core::Type>,
37 pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
38}
39
40#[derive(Debug, Clone)]
42pub struct EngineMetrics {
43 pub events_processed: u64,
44 pub output_events_emitted: u64,
45 pub streams_count: usize,
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct ReloadReport {
51 pub streams_added: Vec<String>,
53 pub streams_removed: Vec<String>,
55 pub streams_updated: Vec<String>,
57 pub state_preserved: Vec<String>,
59 pub state_reset: Vec<String>,
61 pub warnings: Vec<String>,
63}
64
65impl ReloadReport {
66 pub const fn is_empty(&self) -> bool {
67 self.streams_added.is_empty()
68 && self.streams_removed.is_empty()
69 && self.streams_updated.is_empty()
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct SourceBinding {
76 pub connector_name: String,
77 pub event_type: String,
78 pub topic_override: Option<String>,
79 pub extra_params: HashMap<String, String>,
81}
82
83#[derive(Debug, Clone)]
85pub struct NamedPattern {
86 pub name: String,
88 pub expr: SasePatternExpr,
90 pub within: Option<Expr>,
92 pub partition_by: Option<Expr>,
94}
95
96pub struct StreamDefinition {
102 pub name: String,
103 pub name_arc: Arc<str>,
106 pub source: RuntimeSource,
107 pub operations: Vec<RuntimeOp>,
108 pub sase_engine: Option<SaseEngine>,
110 pub join_buffer: Option<JoinBuffer>,
112 pub event_type_to_source: FxHashMap<String, String>,
114 pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
116 pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
118 pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
120 pub last_raw_event: Option<crate::event::SharedEvent>,
123 pub enrichment: Option<(
125 Arc<dyn crate::enrichment::EnrichmentProvider>,
126 Arc<crate::enrichment::EnrichmentCache>,
127 )>,
128 #[allow(dead_code)]
130 pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
131}
132
133pub enum RuntimeSource {
135 EventType(String),
136 Stream(String),
137 Join(Vec<String>),
138 Merge(Vec<MergeSource>),
140 Timer(TimerConfig),
142}
143
144impl RuntimeSource {
145 pub fn describe(&self) -> String {
147 match self {
148 Self::EventType(t) => format!("event:{t}"),
149 Self::Stream(s) => format!("stream:{s}"),
150 Self::Join(sources) => format!("join:{}", sources.join(",")),
151 Self::Merge(sources) => format!("merge:{} sources", sources.len()),
152 Self::Timer(config) => {
153 format!("timer:{}ms", config.interval_ns / 1_000_000)
154 }
155 }
156 }
157}
158
159pub struct TimerConfig {
161 pub interval_ns: u64,
163 pub initial_delay_ns: Option<u64>,
165 pub timer_event_type: String,
167}
168
169pub struct MergeSource {
171 pub name: String,
172 pub event_type: String,
173 pub filter: Option<varpulis_core::ast::Expr>,
174}
175
176pub enum RuntimeOp {
178 WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
180 WhereExpr(varpulis_core::ast::Expr),
182 Window(WindowType),
184 PartitionedWindow(PartitionedWindowState),
186 PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
188 Aggregate(Aggregator),
189 PartitionedAggregate(PartitionedAggregatorState),
191 Having(varpulis_core::ast::Expr),
193 Select(SelectConfig),
195 Emit(EmitConfig),
196 EmitExpr(EmitExprConfig),
198 Print(PrintConfig),
200 Log(LogConfig),
202 Sequence,
204 Pattern(PatternConfig),
206 Process(varpulis_core::ast::Expr),
208 To(ToConfig),
210 TrendAggregate(TrendAggregateConfig),
212 #[allow(dead_code)]
214 Score(ScoreConfig),
215 Forecast(ForecastConfig),
217 Enrich(EnrichConfig),
219 Distinct(DistinctState),
221 Limit(LimitState),
223 Concurrent(ConcurrentConfig),
225}
226
227impl RuntimeOp {
228 pub const fn summary_name(&self) -> &'static str {
232 match self {
233 Self::WhereClosure(_) => "Filter",
234 Self::WhereExpr(_) => "Filter",
235 Self::Window(_) => "Window",
236 Self::PartitionedWindow(_) => "PartitionedWindow",
237 Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
238 Self::Aggregate(_) => "Aggregate",
239 Self::PartitionedAggregate(_) => "PartitionedAggregate",
240 Self::Having(_) => "Having",
241 Self::Select(_) => "Select",
242 Self::Emit(_) => "Emit",
243 Self::EmitExpr(_) => "EmitExpr",
244 Self::Print(_) => "Print",
245 Self::Log(_) => "Log",
246 Self::Sequence => "Sequence",
247 Self::Pattern(_) => "Pattern",
248 Self::Process(_) => "Process",
249 Self::To(_) => "Sink",
250 Self::TrendAggregate(_) => "TrendAggregate",
251 Self::Score(_) => "Score",
252 Self::Forecast(_) => "Forecast",
253 Self::Enrich(_) => "Enrich",
254 Self::Distinct(_) => "Distinct",
255 Self::Limit(_) => "Limit",
256 Self::Concurrent(_) => "Concurrent",
257 }
258 }
259}
260
261pub struct ConcurrentConfig {
263 pub workers: usize,
264 pub partition_key: Option<String>,
265 pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
266}
267
268pub struct TrendAggregateConfig {
270 pub fields: Vec<(String, crate::greta::GretaAggregate)>,
272 pub query_id: crate::greta::QueryId,
274}
275
276#[allow(dead_code)]
278pub struct ForecastConfig {
279 pub confidence_threshold: f64,
281 pub horizon_ns: u64,
283 pub warmup_events: u64,
285 pub max_depth: usize,
287 pub hawkes: bool,
289 pub conformal: bool,
291}
292
293pub struct EnrichConfig {
295 pub connector_name: String,
297 pub key_expr: varpulis_core::ast::Expr,
299 pub fields: Vec<String>,
301 pub cache_ttl_ns: Option<u64>,
303 pub timeout_ns: u64,
305 pub fallback: Option<varpulis_core::Value>,
307}
308
309#[allow(dead_code)]
311pub struct ScoreConfig {
312 #[cfg(feature = "onnx")]
313 pub model: std::sync::Arc<crate::scoring::OnnxModel>,
314 pub input_fields: Vec<String>,
315 pub output_fields: Vec<String>,
316 pub batch_size: usize,
317}
318
319pub struct ToConfig {
321 pub connector_name: String,
322 pub topic_override: Option<String>,
324 pub sink_key: String,
326 #[allow(dead_code)]
328 pub extra_params: HashMap<String, String>,
329}
330
331pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
333
334pub struct DistinctState {
336 pub expr: Option<varpulis_core::ast::Expr>,
338 pub seen: hashlink::LruCache<String, ()>,
340}
341
342pub struct LimitState {
344 pub max: usize,
345 pub count: usize,
346}
347
348pub struct SelectConfig {
350 pub fields: Vec<(String, varpulis_core::ast::Expr)>,
352}
353
354pub struct StreamProcessResult {
356 pub emitted_events: Vec<SharedEvent>,
358 pub output_events: Vec<SharedEvent>,
360 pub sink_events_sent: u64,
362}
363
364pub struct PartitionedWindowState {
366 pub partition_key: String,
367 pub window_size: usize, pub windows: FxHashMap<String, CountWindow>,
369}
370
371impl PartitionedWindowState {
372 pub fn new(partition_key: String, window_size: usize) -> Self {
373 Self {
374 partition_key,
375 window_size,
376 windows: FxHashMap::default(),
377 }
378 }
379
380 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
381 let key = event.get(&self.partition_key).map_or_else(
382 || "default".to_string(),
383 |v| v.to_partition_key().into_owned(),
384 );
385
386 let window = self
387 .windows
388 .entry(key)
389 .or_insert_with(|| CountWindow::new(self.window_size));
390
391 window.add_shared(event)
392 }
393}
394
395pub struct PartitionedSlidingCountWindowState {
397 pub partition_key: String,
398 pub window_size: usize,
399 pub slide_size: usize,
400 pub windows: FxHashMap<String, SlidingCountWindow>,
401}
402
403impl PartitionedSlidingCountWindowState {
404 pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
405 Self {
406 partition_key,
407 window_size,
408 slide_size,
409 windows: FxHashMap::default(),
410 }
411 }
412
413 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
414 let key = event.get(&self.partition_key).map_or_else(
415 || "default".to_string(),
416 |v| v.to_partition_key().into_owned(),
417 );
418
419 let window = self
420 .windows
421 .entry(key)
422 .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
423
424 window.add_shared(event)
425 }
426}
427
428pub struct PartitionedAggregatorState {
430 pub partition_key: String,
431 pub aggregator_template: Aggregator,
432}
433
434impl PartitionedAggregatorState {
435 pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
436 Self {
437 partition_key,
438 aggregator_template: aggregator,
439 }
440 }
441
442 pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
443 let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
445
446 for event in events {
447 let key = event.get(&self.partition_key).map_or_else(
448 || "default".to_string(),
449 |v| v.to_partition_key().into_owned(),
450 );
451 partitions.entry(key).or_default().push(Arc::clone(event));
452 }
453
454 let mut results = Vec::new();
456 for (key, partition_events) in partitions {
457 let result = self.aggregator_template.apply_shared(&partition_events);
458 results.push((key, result));
459 }
460
461 results
462 }
463}
464
465#[allow(dead_code)]
467pub struct PatternConfig {
468 pub name: String,
469 pub matcher: varpulis_core::ast::Expr,
470}
471
472pub struct PrintConfig {
474 pub exprs: Vec<varpulis_core::ast::Expr>,
475}
476
477pub struct LogConfig {
479 pub level: String,
480 pub message: Option<String>,
481 pub data_field: Option<String>,
482}
483
484pub enum WindowType {
486 Tumbling(TumblingWindow),
487 Sliding(SlidingWindow),
488 Count(CountWindow),
489 SlidingCount(SlidingCountWindow),
490 PartitionedTumbling(PartitionedTumblingWindow),
491 PartitionedSliding(PartitionedSlidingWindow),
492 Session(SessionWindow),
493 PartitionedSession(PartitionedSessionWindow),
494}
495
496#[allow(dead_code)]
498pub struct EmitConfig {
499 pub fields: Vec<(String, String)>, pub target_context: Option<String>,
501}
502
503#[allow(dead_code)]
505pub struct EmitExprConfig {
506 pub fields: Vec<(String, varpulis_core::ast::Expr)>, pub target_context: Option<String>,
508}
509
510pub struct LateDataConfig {
512 pub allowed_lateness: chrono::Duration,
515 pub side_output_stream: Option<String>,
518}