1use std::collections::HashMap;
6use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::FxHashMap;
10use varpulis_core::ast::{ConfigValue, Expr, SasePatternExpr};
11use varpulis_core::Value;
12
13use crate::aggregation::Aggregator;
14use crate::event::SharedEvent;
15use crate::join::JoinBuffer;
16use crate::sase::SaseEngine;
17use crate::window::{
18 CountWindow, PartitionedSessionWindow, PartitionedSlidingWindow, PartitionedTumblingWindow,
19 SessionWindow, SlidingCountWindow, SlidingWindow, TumblingWindow,
20};
21
22#[derive(Debug, Clone)]
28pub struct EngineConfig {
29 pub name: String,
30 pub values: HashMap<String, ConfigValue>,
31}
32
33#[derive(Debug, Clone)]
35pub struct UserFunction {
36 pub name: String,
37 pub params: Vec<(String, varpulis_core::Type)>, pub return_type: Option<varpulis_core::Type>,
39 pub body: Vec<varpulis_core::span::Spanned<varpulis_core::Stmt>>,
40}
41
42#[derive(Debug, Clone)]
44pub struct EngineMetrics {
45 pub events_processed: u64,
46 pub output_events_emitted: u64,
47 pub streams_count: usize,
48}
49
50#[derive(Debug, Clone, Default)]
52pub struct ReloadReport {
53 pub streams_added: Vec<String>,
55 pub streams_removed: Vec<String>,
57 pub streams_updated: Vec<String>,
59 pub state_preserved: Vec<String>,
61 pub state_reset: Vec<String>,
63 pub warnings: Vec<String>,
65}
66
67impl ReloadReport {
68 pub const fn is_empty(&self) -> bool {
69 self.streams_added.is_empty()
70 && self.streams_removed.is_empty()
71 && self.streams_updated.is_empty()
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct SourceBinding {
78 pub connector_name: String,
79 pub event_type: String,
80 pub topic_override: Option<String>,
81 pub extra_params: HashMap<String, String>,
83}
84
85#[derive(Debug, Clone)]
87pub struct NamedPattern {
88 pub name: String,
90 pub expr: SasePatternExpr,
92 pub within: Option<Expr>,
94 pub partition_by: Option<Expr>,
96}
97
98pub struct StreamDefinition {
104 pub name: String,
105 pub name_arc: Arc<str>,
108 pub source: RuntimeSource,
109 pub operations: Vec<RuntimeOp>,
110 pub sase_engine: Option<SaseEngine>,
112 pub join_buffer: Option<JoinBuffer>,
114 pub event_type_to_source: FxHashMap<String, String>,
116 pub hamlet_aggregator: Option<crate::hamlet::HamletAggregator>,
118 pub shared_hamlet_ref: Option<Arc<std::sync::Mutex<crate::hamlet::HamletAggregator>>>,
120 pub pst_forecaster: Option<crate::pst::PatternMarkovChain>,
122 pub last_raw_event: Option<crate::event::SharedEvent>,
125 pub enrichment: Option<(
127 Arc<dyn crate::enrichment::EnrichmentProvider>,
128 Arc<crate::enrichment::EnrichmentCache>,
129 )>,
130 #[allow(dead_code)]
132 pub buffer_config: Option<crate::backpressure::StageBufferConfig>,
133}
134
135pub enum RuntimeSource {
137 EventType(String),
138 Stream(String),
139 Join(Vec<String>),
140 Merge(Vec<MergeSource>),
142 Timer(TimerConfig),
144}
145
146impl RuntimeSource {
147 pub fn describe(&self) -> String {
149 match self {
150 Self::EventType(t) => format!("event:{t}"),
151 Self::Stream(s) => format!("stream:{s}"),
152 Self::Join(sources) => format!("join:{}", sources.join(",")),
153 Self::Merge(sources) => format!("merge:{} sources", sources.len()),
154 Self::Timer(config) => {
155 format!("timer:{}ms", config.interval_ns / 1_000_000)
156 }
157 }
158 }
159}
160
161pub struct TimerConfig {
163 pub interval_ns: u64,
165 pub initial_delay_ns: Option<u64>,
167 pub timer_event_type: String,
169}
170
171pub struct MergeSource {
173 pub name: String,
174 pub event_type: String,
175 pub filter: Option<varpulis_core::ast::Expr>,
176}
177
178pub enum RuntimeOp {
180 WhereClosure(Box<dyn Fn(&SharedEvent) -> bool + Send + Sync>),
182 WhereExpr(varpulis_core::ast::Expr),
184 Window(WindowType),
186 PartitionedWindow(PartitionedWindowState),
188 PartitionedSlidingCountWindow(PartitionedSlidingCountWindowState),
190 Aggregate(Aggregator),
191 PartitionedAggregate(PartitionedAggregatorState),
193 Having(varpulis_core::ast::Expr),
195 Select(SelectConfig),
197 Emit(EmitConfig),
198 EmitExpr(EmitExprConfig),
200 Print(PrintConfig),
202 Log(LogConfig),
204 Sequence,
206 Pattern(PatternConfig),
208 Process(varpulis_core::ast::Expr),
210 To(ToConfig),
212 TrendAggregate(TrendAggregateConfig),
214 #[allow(dead_code)]
216 Score(ScoreConfig),
217 Forecast(ForecastConfig),
219 Enrich(EnrichConfig),
221 Distinct(DistinctState),
223 Limit(LimitState),
225 Concurrent(ConcurrentConfig),
227}
228
229impl RuntimeOp {
230 pub const fn summary_name(&self) -> &'static str {
234 match self {
235 Self::WhereClosure(_) => "Filter",
236 Self::WhereExpr(_) => "Filter",
237 Self::Window(_) => "Window",
238 Self::PartitionedWindow(_) => "PartitionedWindow",
239 Self::PartitionedSlidingCountWindow(_) => "PartitionedSlidingCountWindow",
240 Self::Aggregate(_) => "Aggregate",
241 Self::PartitionedAggregate(_) => "PartitionedAggregate",
242 Self::Having(_) => "Having",
243 Self::Select(_) => "Select",
244 Self::Emit(_) => "Emit",
245 Self::EmitExpr(_) => "EmitExpr",
246 Self::Print(_) => "Print",
247 Self::Log(_) => "Log",
248 Self::Sequence => "Sequence",
249 Self::Pattern(_) => "Pattern",
250 Self::Process(_) => "Process",
251 Self::To(_) => "Sink",
252 Self::TrendAggregate(_) => "TrendAggregate",
253 Self::Score(_) => "Score",
254 Self::Forecast(_) => "Forecast",
255 Self::Enrich(_) => "Enrich",
256 Self::Distinct(_) => "Distinct",
257 Self::Limit(_) => "Limit",
258 Self::Concurrent(_) => "Concurrent",
259 }
260 }
261}
262
263pub struct ConcurrentConfig {
265 pub workers: usize,
266 pub partition_key: Option<String>,
267 pub thread_pool: std::sync::Arc<rayon::ThreadPool>,
268}
269
270pub struct TrendAggregateConfig {
272 pub fields: Vec<(String, crate::greta::GretaAggregate)>,
274 pub query_id: crate::greta::QueryId,
276 pub field_aggregates: Vec<FieldAggregateInfo>,
278 pub type_index_to_name: Vec<String>,
280 pub accumulated: Vec<SharedEvent>,
282}
283
284pub struct FieldAggregateInfo {
287 pub output_alias: String,
289 pub func: String,
291 pub event_type: String,
293 pub field_name: String,
295}
296
297#[allow(dead_code)]
299pub struct ForecastConfig {
300 pub confidence_threshold: f64,
302 pub horizon_ns: u64,
304 pub warmup_events: u64,
306 pub max_depth: usize,
308 pub hawkes: bool,
310 pub conformal: bool,
312}
313
314pub struct EnrichConfig {
316 pub connector_name: String,
318 pub key_expr: varpulis_core::ast::Expr,
320 pub fields: Vec<String>,
322 pub cache_ttl_ns: Option<u64>,
324 pub timeout_ns: u64,
326 pub fallback: Option<varpulis_core::Value>,
328}
329
330#[allow(dead_code)]
332pub struct ScoreConfig {
333 #[cfg(feature = "onnx")]
334 pub model: std::sync::Arc<crate::scoring::OnnxModel>,
335 pub input_fields: Vec<String>,
336 pub output_fields: Vec<String>,
337 pub batch_size: usize,
338}
339
340pub enum TopicSpec {
342 Static(String),
344 Dynamic(varpulis_core::ast::Expr),
346}
347
348pub struct ToConfig {
350 pub connector_name: String,
351 pub topic: Option<TopicSpec>,
353 pub sink_key: String,
355 #[allow(dead_code)]
357 pub extra_params: HashMap<String, String>,
358}
359
360pub const DISTINCT_LRU_CAPACITY: usize = 100_000;
362
363pub struct DistinctState {
365 pub expr: Option<varpulis_core::ast::Expr>,
367 pub seen: hashlink::LruCache<String, ()>,
369}
370
371pub struct LimitState {
373 pub max: usize,
374 pub count: usize,
375}
376
377pub struct SelectConfig {
379 pub fields: Vec<(String, varpulis_core::ast::Expr)>,
381}
382
383pub struct StreamProcessResult {
385 pub emitted_events: Vec<SharedEvent>,
387 pub output_events: Vec<SharedEvent>,
389 pub sink_events_sent: u64,
391}
392
393pub struct PartitionedWindowState {
395 pub partition_key: String,
396 pub window_size: usize, pub windows: FxHashMap<String, CountWindow>,
398}
399
400impl PartitionedWindowState {
401 pub fn new(partition_key: String, window_size: usize) -> Self {
402 Self {
403 partition_key,
404 window_size,
405 windows: FxHashMap::default(),
406 }
407 }
408
409 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
410 let key = event.get(&self.partition_key).map_or_else(
411 || "default".to_string(),
412 |v| v.to_partition_key().into_owned(),
413 );
414
415 let window = self
416 .windows
417 .entry(key)
418 .or_insert_with(|| CountWindow::new(self.window_size));
419
420 window.add_shared(event)
421 }
422}
423
424pub struct PartitionedSlidingCountWindowState {
426 pub partition_key: String,
427 pub window_size: usize,
428 pub slide_size: usize,
429 pub windows: FxHashMap<String, SlidingCountWindow>,
430}
431
432impl PartitionedSlidingCountWindowState {
433 pub fn new(partition_key: String, window_size: usize, slide_size: usize) -> Self {
434 Self {
435 partition_key,
436 window_size,
437 slide_size,
438 windows: FxHashMap::default(),
439 }
440 }
441
442 pub fn add(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
443 let key = event.get(&self.partition_key).map_or_else(
444 || "default".to_string(),
445 |v| v.to_partition_key().into_owned(),
446 );
447
448 let window = self
449 .windows
450 .entry(key)
451 .or_insert_with(|| SlidingCountWindow::new(self.window_size, self.slide_size));
452
453 window.add_shared(event)
454 }
455}
456
457pub struct PartitionedAggregatorState {
459 pub partition_key: String,
460 pub aggregator_template: Aggregator,
461}
462
463impl PartitionedAggregatorState {
464 pub const fn new(partition_key: String, aggregator: Aggregator) -> Self {
465 Self {
466 partition_key,
467 aggregator_template: aggregator,
468 }
469 }
470
471 pub fn apply(&mut self, events: &[SharedEvent]) -> Vec<(String, IndexMap<String, Value>)> {
472 let mut partitions: FxHashMap<String, Vec<SharedEvent>> = FxHashMap::default();
474
475 for event in events {
476 let key = event.get(&self.partition_key).map_or_else(
477 || "default".to_string(),
478 |v| v.to_partition_key().into_owned(),
479 );
480 partitions.entry(key).or_default().push(Arc::clone(event));
481 }
482
483 let mut results = Vec::new();
485 for (key, partition_events) in partitions {
486 let result = self.aggregator_template.apply_shared(&partition_events);
487 results.push((key, result));
488 }
489
490 results
491 }
492}
493
494#[allow(dead_code)]
496pub struct PatternConfig {
497 pub name: String,
498 pub matcher: varpulis_core::ast::Expr,
499}
500
501pub struct PrintConfig {
503 pub exprs: Vec<varpulis_core::ast::Expr>,
504}
505
506pub struct LogConfig {
508 pub level: String,
509 pub message: Option<String>,
510 pub data_field: Option<String>,
511}
512
513pub enum WindowType {
515 Tumbling(TumblingWindow),
516 Sliding(SlidingWindow),
517 Count(CountWindow),
518 SlidingCount(SlidingCountWindow),
519 PartitionedTumbling(PartitionedTumblingWindow),
520 PartitionedSliding(PartitionedSlidingWindow),
521 Session(SessionWindow),
522 PartitionedSession(PartitionedSessionWindow),
523}
524
525#[allow(dead_code)]
527pub struct EmitConfig {
528 pub fields: Vec<(String, String)>, pub target_context: Option<String>,
530}
531
532#[allow(dead_code)]
534pub struct EmitExprConfig {
535 pub fields: Vec<(String, varpulis_core::ast::Expr)>, pub target_context: Option<String>,
537}
538
539pub struct LateDataConfig {
541 pub allowed_lateness: chrono::Duration,
544 pub side_output_stream: Option<String>,
547}