1use std::time::Duration;
7
8use tracing::warn;
9use varpulis_core::ast::{FollowedByClause, SequenceStepDecl, StreamSource};
10
11use crate::aggregation::{
12 AggBinOp, Avg, Count, CountDistinct, Ema, ExprAggregate, First, Last, Max, Median, Min,
13 Percentile, StdDev, Sum, P50, P95, P99,
14};
15use crate::sase::{CompareOp, Predicate, SasePattern};
16
17pub fn compile_agg_expr(
19 expr: &varpulis_core::ast::Expr,
20) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
21 use varpulis_core::ast::{Arg, BinOp, Expr};
22
23 match expr {
24 Expr::Call { func, args } => {
26 let func_name = match func.as_ref() {
27 Expr::Ident(s) => s.clone(),
28 _ => return None,
29 };
30
31 if func_name == "count" {
33 if let Some(Arg::Positional(Expr::Call {
34 func: inner_func,
35 args: inner_args,
36 })) = args.first()
37 {
38 if let Expr::Ident(inner_name) = inner_func.as_ref() {
39 if inner_name == "distinct" {
40 let field = inner_args.first().and_then(|a| match a {
41 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
42 _ => None,
43 });
44 return Some((Box::new(CountDistinct), field));
45 }
46 }
47 }
48 }
49
50 let field = args.first().and_then(|a| match a {
51 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
52 _ => None,
53 });
54
55 let second_int = args
57 .get(1)
58 .and_then(|a| match a {
59 Arg::Positional(Expr::Int(n)) => Some(*n as usize),
60 _ => None,
61 })
62 .unwrap_or(12);
63
64 let second_float = args.get(1).and_then(|a| match a {
65 Arg::Positional(Expr::Float(f)) => Some(*f),
66 _ => None,
67 });
68
69 let agg_func: Box<dyn crate::aggregation::AggregateFunc> = match func_name.as_str() {
70 "count" => Box::new(Count),
71 "sum" => Box::new(Sum),
72 "avg" => Box::new(Avg),
73 "min" => Box::new(Min),
74 "max" => Box::new(Max),
75 "last" => Box::new(Last),
76 "first" => Box::new(First),
77 "stddev" => Box::new(StdDev),
78 "ema" => Box::new(Ema::new(second_int)),
79 "count_distinct" => Box::new(CountDistinct),
80 "median" => Box::new(Median),
81 "p50" => Box::new(P50),
82 "p95" => Box::new(P95),
83 "p99" => Box::new(P99),
84 "percentile" => Box::new(Percentile::new(second_float.unwrap_or(0.5))),
85 other => {
86 warn!("Unknown aggregation function: {}", other);
89 return None;
90 }
91 };
92
93 Some((agg_func, field))
94 }
95
96 Expr::Binary { op, left, right } => {
98 let agg_op = match op {
99 BinOp::Add => AggBinOp::Add,
100 BinOp::Sub => AggBinOp::Sub,
101 BinOp::Mul => AggBinOp::Mul,
102 BinOp::Div => AggBinOp::Div,
103 _ => {
104 warn!("Unsupported binary operator in aggregate: {:?}", op);
105 return None;
106 }
107 };
108
109 let (left_func, left_field) = compile_agg_expr(left)?;
110 let (right_func, right_field) = compile_agg_expr(right)?;
111
112 let expr_agg =
113 ExprAggregate::new(left_func, left_field, agg_op, right_func, right_field);
114
115 Some((Box::new(expr_agg), None))
116 }
117
118 _ => {
119 warn!("Unsupported aggregate expression: {:?}", expr);
120 None
121 }
122 }
123}
124
125pub fn compile_agg_expr_with_udfs(
131 expr: &varpulis_core::ast::Expr,
132 udf_registry: &crate::udf::UdfRegistry,
133) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
134 if let Some(result) = compile_agg_expr(expr) {
136 return Some(result);
137 }
138
139 use varpulis_core::ast::{Arg, Expr};
141 if let Expr::Call { func, args } = expr {
142 if let Expr::Ident(func_name) = func.as_ref() {
143 if let Some(agg_udf) = udf_registry.get_aggregate(func_name) {
144 let field = args.first().and_then(|a| match a {
145 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
146 _ => None,
147 });
148
149 let adapter = UdfAggregateAdapter {
150 udf: agg_udf.clone(),
151 };
152 return Some((Box::new(adapter), field));
153 }
154 }
155 }
156
157 None
158}
159
160struct UdfAggregateAdapter {
162 udf: std::sync::Arc<dyn crate::udf::AggregateUDF>,
163}
164
165impl crate::aggregation::AggregateFunc for UdfAggregateAdapter {
166 fn name(&self) -> &'static str {
167 "udf_aggregate"
168 }
169
170 fn apply(&self, events: &[crate::event::Event], field: Option<&str>) -> varpulis_core::Value {
171 let mut acc = self.udf.init();
172 let field_name = field.unwrap_or("value");
173 for event in events {
174 if let Some(val) = event.get(field_name) {
175 acc.update(val);
176 }
177 }
178 acc.finish()
179 }
180}
181
182#[derive(Debug, Clone)]
188pub struct DerivedStreamInfo {
189 pub event_type: String,
191 pub filter: Option<varpulis_core::ast::Expr>,
193}
194
195pub type StreamResolver<'a> = &'a dyn Fn(&str) -> Option<DerivedStreamInfo>;
197
198pub fn compile_to_sase_pattern_with_resolver(
200 source: &StreamSource,
201 followed_by_clauses: &[FollowedByClause],
202 _negation_clauses: &[FollowedByClause],
203 within_duration: Option<Duration>,
204 stream_resolver: StreamResolver,
205) -> Option<SasePattern> {
206 let mut steps: Vec<SasePattern> = Vec::new();
207
208 match source {
210 StreamSource::Sequence(decl) => {
211 for step in &decl.steps {
213 let pattern = compile_sequence_step_to_sase(step);
214 steps.push(pattern);
215 }
216 }
217 StreamSource::Ident(name) => {
218 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
220 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
221 (info.event_type, pred)
222 } else {
223 (name.clone(), None)
224 };
225 steps.push(SasePattern::Event {
226 event_type,
227 predicate,
228 alias: None,
229 });
230 }
231 StreamSource::IdentWithAlias { name, alias } => {
232 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
234 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
235 (info.event_type, pred)
236 } else {
237 (name.clone(), None)
238 };
239 steps.push(SasePattern::Event {
240 event_type,
241 predicate,
242 alias: Some(alias.clone()),
243 });
244 }
245 StreamSource::IdentWithFilterAndAlias {
246 name,
247 filter,
248 alias,
249 } => {
250 let (event_type, mut predicate) = if let Some(info) = stream_resolver(name) {
252 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
253 (info.event_type, pred)
254 } else {
255 (name.clone(), None)
256 };
257 if let Some(inline_pred) = expr_to_sase_predicate(filter) {
259 predicate = Some(inline_pred);
260 }
261 steps.push(SasePattern::Event {
262 event_type,
263 predicate,
264 alias: alias.clone(),
265 });
266 }
267 StreamSource::AllWithAlias { name, alias } => {
268 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
270 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
271 (info.event_type, pred)
272 } else {
273 (name.clone(), None)
274 };
275 let event_pattern = SasePattern::Event {
277 event_type,
278 predicate,
279 alias: alias.clone(),
280 };
281 steps.push(SasePattern::KleenePlus(Box::new(event_pattern)));
282 }
283 _ => return None,
284 }
285
286 for clause in followed_by_clauses {
288 let (resolved_event_type, stream_predicate) =
290 if let Some(info) = stream_resolver(&clause.event_type) {
291 (info.event_type, info.filter)
292 } else {
293 (clause.event_type.clone(), None)
294 };
295
296 let clause_predicate = clause.filter.as_ref().and_then(expr_to_sase_predicate);
298 let stream_pred = stream_predicate.as_ref().and_then(expr_to_sase_predicate);
299
300 let predicate = match (stream_pred, clause_predicate) {
301 (Some(sp), Some(cp)) => Some(Predicate::And(Box::new(sp), Box::new(cp))),
302 (Some(sp), None) => Some(sp),
303 (None, Some(cp)) => Some(cp),
304 (None, None) => None,
305 };
306
307 let event_pattern = SasePattern::Event {
308 event_type: resolved_event_type,
309 predicate,
310 alias: clause.alias.clone(),
311 };
312
313 let pattern = if clause.match_all {
315 SasePattern::KleenePlus(Box::new(event_pattern))
316 } else {
317 event_pattern
318 };
319
320 steps.push(pattern);
321 }
322
323 if steps.is_empty() {
325 return None;
326 }
327
328 let pattern = if steps.len() == 1 {
329 steps.pop()?
331 } else {
332 SasePattern::Seq(steps)
333 };
334
335 match within_duration {
337 Some(duration) => Some(SasePattern::Within(Box::new(pattern), duration)),
338 None => Some(pattern),
339 }
340}
341
342fn compile_sequence_step_to_sase(step: &SequenceStepDecl) -> SasePattern {
344 let predicate = step.filter.as_ref().and_then(expr_to_sase_predicate);
345
346 SasePattern::Event {
347 event_type: step.event_type.clone(),
348 predicate,
349 alias: Some(step.alias.clone()),
350 }
351}
352
353pub fn expr_to_sase_predicate(expr: &varpulis_core::ast::Expr) -> Option<Predicate> {
355 use varpulis_core::ast::{BinOp, Expr, UnaryOp};
356
357 match expr {
358 Expr::Binary { op, left, right } => {
360 let compare_op = match op {
361 BinOp::Eq => Some(CompareOp::Eq),
362 BinOp::NotEq => Some(CompareOp::NotEq),
363 BinOp::Lt => Some(CompareOp::Lt),
364 BinOp::Le => Some(CompareOp::Le),
365 BinOp::Gt => Some(CompareOp::Gt),
366 BinOp::Ge => Some(CompareOp::Ge),
367 BinOp::And => {
368 let left_pred = expr_to_sase_predicate(left)?;
369 let right_pred = expr_to_sase_predicate(right)?;
370 return Some(Predicate::And(Box::new(left_pred), Box::new(right_pred)));
371 }
372 BinOp::Or => {
373 let left_pred = expr_to_sase_predicate(left)?;
374 let right_pred = expr_to_sase_predicate(right)?;
375 return Some(Predicate::Or(Box::new(left_pred), Box::new(right_pred)));
376 }
377 _ => None,
378 }?;
379
380 if let (
383 Expr::Ident(field),
384 Expr::Member {
385 expr: ref_expr,
386 member: ref_field,
387 },
388 ) = (left.as_ref(), right.as_ref())
389 {
390 if let Expr::Ident(ref_alias) = ref_expr.as_ref() {
391 return Some(Predicate::CompareRef {
392 field: field.clone(),
393 op: compare_op,
394 ref_alias: ref_alias.clone(),
395 ref_field: ref_field.clone(),
396 });
397 }
398 }
399
400 let field = match left.as_ref() {
402 Expr::Ident(name) => name.clone(),
403 _ => {
404 return Some(Predicate::Expr(Box::new(expr.clone())));
406 }
407 };
408
409 if let Some(value) = expr_to_value(right) {
411 Some(Predicate::Compare {
412 field,
413 op: compare_op,
414 value,
415 })
416 } else {
417 Some(Predicate::Expr(Box::new(expr.clone())))
420 }
421 }
422
423 Expr::Unary {
425 op: UnaryOp::Not,
426 expr: inner,
427 } => {
428 let inner_pred = expr_to_sase_predicate(inner)?;
429 Some(Predicate::Not(Box::new(inner_pred)))
430 }
431
432 _ => Some(Predicate::Expr(Box::new(expr.clone()))),
434 }
435}
436
437pub fn compile_sase_pattern_expr(
439 expr: &varpulis_core::ast::SasePatternExpr,
440 within: Option<Duration>,
441) -> Option<SasePattern> {
442 use varpulis_core::ast::SasePatternExpr;
443
444 let pattern = match expr {
445 SasePatternExpr::Seq(items) => {
446 let steps: Vec<SasePattern> = items.iter().map(compile_sase_pattern_item).collect();
447 if steps.len() == 1 {
448 steps.into_iter().next().unwrap()
449 } else {
450 SasePattern::Seq(steps)
451 }
452 }
453 SasePatternExpr::And(left, right) => {
454 let l = compile_sase_pattern_expr(left, None)?;
455 let r = compile_sase_pattern_expr(right, None)?;
456 SasePattern::And(Box::new(l), Box::new(r))
457 }
458 SasePatternExpr::Or(left, right) => {
459 let l = compile_sase_pattern_expr(left, None)?;
460 let r = compile_sase_pattern_expr(right, None)?;
461 SasePattern::Or(Box::new(l), Box::new(r))
462 }
463 SasePatternExpr::Not(inner) => {
464 let i = compile_sase_pattern_expr(inner, None)?;
465 SasePattern::Not(Box::new(i))
466 }
467 SasePatternExpr::Event(name) => SasePattern::Event {
468 event_type: name.clone(),
469 predicate: None,
470 alias: None,
471 },
472 SasePatternExpr::Group(inner) => {
473 return compile_sase_pattern_expr(inner, within);
474 }
475 };
476
477 if let Some(duration) = within {
479 Some(SasePattern::Within(Box::new(pattern), duration))
480 } else {
481 Some(pattern)
482 }
483}
484
485fn compile_sase_pattern_item(item: &varpulis_core::ast::SasePatternItem) -> SasePattern {
487 let predicate = item.filter.as_ref().and_then(expr_to_sase_predicate);
488 let base = SasePattern::Event {
489 event_type: item.event_type.clone(),
490 predicate,
491 alias: item.alias.clone(),
492 };
493
494 match &item.kleene {
495 Some(varpulis_core::ast::KleeneOp::Plus) => SasePattern::KleenePlus(Box::new(base)),
496 Some(varpulis_core::ast::KleeneOp::Star) => SasePattern::KleeneStar(Box::new(base)),
497 Some(varpulis_core::ast::KleeneOp::Optional) => {
498 SasePattern::KleeneStar(Box::new(base))
500 }
501 None => base,
502 }
503}
504
505pub fn extract_event_types_from_pattern_expr(
507 expr: &varpulis_core::ast::SasePatternExpr,
508) -> Vec<String> {
509 use varpulis_core::ast::SasePatternExpr;
510
511 let mut types = Vec::new();
512 match expr {
513 SasePatternExpr::Seq(items) => {
514 for item in items {
515 if !types.contains(&item.event_type) {
516 types.push(item.event_type.clone());
517 }
518 }
519 }
520 SasePatternExpr::And(left, right) | SasePatternExpr::Or(left, right) => {
521 for t in extract_event_types_from_pattern_expr(left) {
522 if !types.contains(&t) {
523 types.push(t);
524 }
525 }
526 for t in extract_event_types_from_pattern_expr(right) {
527 if !types.contains(&t) {
528 types.push(t);
529 }
530 }
531 }
532 SasePatternExpr::Not(inner) | SasePatternExpr::Group(inner) => {
533 types = extract_event_types_from_pattern_expr(inner);
534 }
535 SasePatternExpr::Event(name) => {
536 types.push(name.clone());
537 }
538 }
539 types
540}
541
542fn expr_to_value(expr: &varpulis_core::ast::Expr) -> Option<varpulis_core::Value> {
544 use varpulis_core::ast::Expr;
545 use varpulis_core::Value;
546
547 match expr {
548 Expr::Int(n) => Some(Value::Int(*n)),
549 Expr::Float(f) => Some(Value::Float(*f)),
550 Expr::Str(s) => Some(Value::Str(s.clone().into())),
551 Expr::Bool(b) => Some(Value::Bool(*b)),
552 _ => None,
553 }
554}