1use std::time::Duration;
7
8use tracing::warn;
9use varpulis_core::ast::{FollowedByClause, SequenceStepDecl, StreamSource};
10
11use crate::aggregation::{
12 AggBinOp, Avg, Count, CountDistinct, Ema, ExprAggregate, First, Last, Max, Median, Min,
13 Percentile, StdDev, Sum, P50, P95, P99,
14};
15use crate::sase::{CompareOp, Predicate, SasePattern};
16
17pub fn compile_agg_expr(
19 expr: &varpulis_core::ast::Expr,
20) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
21 use varpulis_core::ast::{Arg, BinOp, Expr};
22
23 match expr {
24 Expr::Call { func, args } => {
26 let func_name = match func.as_ref() {
27 Expr::Ident(s) => s.clone(),
28 _ => return None,
29 };
30
31 if func_name == "count" {
33 if let Some(Arg::Positional(Expr::Call {
34 func: inner_func,
35 args: inner_args,
36 })) = args.first()
37 {
38 if let Expr::Ident(inner_name) = inner_func.as_ref() {
39 if inner_name == "distinct" {
40 let field = inner_args.first().and_then(|a| match a {
41 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
42 _ => None,
43 });
44 return Some((Box::new(CountDistinct), field));
45 }
46 }
47 }
48 }
49
50 let field = args.first().and_then(|a| match a {
51 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
52 _ => None,
53 });
54
55 let second_int = args
57 .get(1)
58 .and_then(|a| match a {
59 Arg::Positional(Expr::Int(n)) => Some(*n as usize),
60 _ => None,
61 })
62 .unwrap_or(12);
63
64 let second_float = args.get(1).and_then(|a| match a {
65 Arg::Positional(Expr::Float(f)) => Some(*f),
66 _ => None,
67 });
68
69 let agg_func: Box<dyn crate::aggregation::AggregateFunc> = match func_name.as_str() {
70 "count" => Box::new(Count),
71 "sum" => Box::new(Sum),
72 "avg" => Box::new(Avg),
73 "min" => Box::new(Min),
74 "max" => Box::new(Max),
75 "last" => Box::new(Last),
76 "first" => Box::new(First),
77 "stddev" => Box::new(StdDev),
78 "ema" => Box::new(Ema::new(second_int)),
79 "count_distinct" => Box::new(CountDistinct),
80 "median" => Box::new(Median),
81 "p50" => Box::new(P50),
82 "p95" => Box::new(P95),
83 "p99" => Box::new(P99),
84 "percentile" => Box::new(Percentile::new(second_float.unwrap_or(0.5))),
85 other => {
86 warn!("Unknown aggregation function: {}", other);
89 return None;
90 }
91 };
92
93 Some((agg_func, field))
94 }
95
96 Expr::Binary { op, left, right } => {
98 let agg_op = match op {
99 BinOp::Add => AggBinOp::Add,
100 BinOp::Sub => AggBinOp::Sub,
101 BinOp::Mul => AggBinOp::Mul,
102 BinOp::Div => AggBinOp::Div,
103 _ => {
104 warn!("Unsupported binary operator in aggregate: {:?}", op);
105 return None;
106 }
107 };
108
109 let (left_func, left_field) = compile_agg_expr(left)?;
110 let (right_func, right_field) = compile_agg_expr(right)?;
111
112 let expr_agg =
113 ExprAggregate::new(left_func, left_field, agg_op, right_func, right_field);
114
115 Some((Box::new(expr_agg), None))
116 }
117
118 _ => {
119 warn!("Unsupported aggregate expression: {:?}", expr);
120 None
121 }
122 }
123}
124
125pub fn compile_agg_expr_with_udfs(
131 expr: &varpulis_core::ast::Expr,
132 udf_registry: &crate::udf::UdfRegistry,
133) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
134 if let Some(result) = compile_agg_expr(expr) {
136 return Some(result);
137 }
138
139 use varpulis_core::ast::{Arg, Expr};
141 if let Expr::Call { func, args } = expr {
142 if let Expr::Ident(func_name) = func.as_ref() {
143 if let Some(agg_udf) = udf_registry.get_aggregate(func_name) {
144 let field = args.first().and_then(|a| match a {
145 Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
146 _ => None,
147 });
148
149 let adapter = UdfAggregateAdapter {
150 udf: agg_udf.clone(),
151 };
152 return Some((Box::new(adapter), field));
153 }
154 }
155 }
156
157 None
158}
159
160struct UdfAggregateAdapter {
162 udf: std::sync::Arc<dyn crate::udf::AggregateUDF>,
163}
164
165impl crate::aggregation::AggregateFunc for UdfAggregateAdapter {
166 fn name(&self) -> &'static str {
167 "udf_aggregate"
168 }
169
170 fn apply(&self, events: &[crate::event::Event], field: Option<&str>) -> varpulis_core::Value {
171 let mut acc = self.udf.init();
172 let field_name = field.unwrap_or("value");
173 for event in events {
174 if let Some(val) = event.get(field_name) {
175 acc.update(val);
176 }
177 }
178 acc.finish()
179 }
180}
181
182#[derive(Debug, Clone)]
188pub struct DerivedStreamInfo {
189 pub event_type: String,
191 pub filter: Option<varpulis_core::ast::Expr>,
193}
194
195pub type StreamResolver<'a> = &'a dyn Fn(&str) -> Option<DerivedStreamInfo>;
197
198pub fn compile_to_sase_pattern_with_resolver(
200 source: &StreamSource,
201 followed_by_clauses: &[FollowedByClause],
202 _negation_clauses: &[FollowedByClause],
203 within_duration: Option<Duration>,
204 stream_resolver: StreamResolver,
205) -> Option<SasePattern> {
206 let mut steps: Vec<SasePattern> = Vec::new();
207
208 match source {
210 StreamSource::Sequence(decl) => {
211 for step in &decl.steps {
213 let pattern = compile_sequence_step_to_sase(step);
214 steps.push(pattern);
215 }
216 }
217 StreamSource::Ident(name) => {
218 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
220 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
221 (info.event_type, pred)
222 } else {
223 (name.clone(), None)
224 };
225 steps.push(SasePattern::Event {
226 event_type,
227 predicate,
228 alias: None,
229 });
230 }
231 StreamSource::IdentWithAlias { name, alias } => {
232 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
234 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
235 (info.event_type, pred)
236 } else {
237 (name.clone(), None)
238 };
239 steps.push(SasePattern::Event {
240 event_type,
241 predicate,
242 alias: Some(alias.clone()),
243 });
244 }
245 StreamSource::AllWithAlias { name, alias } => {
246 let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
248 let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
249 (info.event_type, pred)
250 } else {
251 (name.clone(), None)
252 };
253 let event_pattern = SasePattern::Event {
255 event_type,
256 predicate,
257 alias: alias.clone(),
258 };
259 steps.push(SasePattern::KleenePlus(Box::new(event_pattern)));
260 }
261 _ => return None,
262 }
263
264 for clause in followed_by_clauses {
266 let (resolved_event_type, stream_predicate) =
268 if let Some(info) = stream_resolver(&clause.event_type) {
269 (info.event_type, info.filter)
270 } else {
271 (clause.event_type.clone(), None)
272 };
273
274 let clause_predicate = clause.filter.as_ref().and_then(expr_to_sase_predicate);
276 let stream_pred = stream_predicate.as_ref().and_then(expr_to_sase_predicate);
277
278 let predicate = match (stream_pred, clause_predicate) {
279 (Some(sp), Some(cp)) => Some(Predicate::And(Box::new(sp), Box::new(cp))),
280 (Some(sp), None) => Some(sp),
281 (None, Some(cp)) => Some(cp),
282 (None, None) => None,
283 };
284
285 let event_pattern = SasePattern::Event {
286 event_type: resolved_event_type,
287 predicate,
288 alias: clause.alias.clone(),
289 };
290
291 let pattern = if clause.match_all {
293 SasePattern::KleenePlus(Box::new(event_pattern))
294 } else {
295 event_pattern
296 };
297
298 steps.push(pattern);
299 }
300
301 if steps.is_empty() {
303 return None;
304 }
305
306 let pattern = if steps.len() == 1 {
307 steps.pop()?
309 } else {
310 SasePattern::Seq(steps)
311 };
312
313 match within_duration {
315 Some(duration) => Some(SasePattern::Within(Box::new(pattern), duration)),
316 None => Some(pattern),
317 }
318}
319
320fn compile_sequence_step_to_sase(step: &SequenceStepDecl) -> SasePattern {
322 let predicate = step.filter.as_ref().and_then(expr_to_sase_predicate);
323
324 SasePattern::Event {
325 event_type: step.event_type.clone(),
326 predicate,
327 alias: Some(step.alias.clone()),
328 }
329}
330
331pub fn expr_to_sase_predicate(expr: &varpulis_core::ast::Expr) -> Option<Predicate> {
333 use varpulis_core::ast::{BinOp, Expr, UnaryOp};
334
335 match expr {
336 Expr::Binary { op, left, right } => {
338 let compare_op = match op {
339 BinOp::Eq => Some(CompareOp::Eq),
340 BinOp::NotEq => Some(CompareOp::NotEq),
341 BinOp::Lt => Some(CompareOp::Lt),
342 BinOp::Le => Some(CompareOp::Le),
343 BinOp::Gt => Some(CompareOp::Gt),
344 BinOp::Ge => Some(CompareOp::Ge),
345 BinOp::And => {
346 let left_pred = expr_to_sase_predicate(left)?;
347 let right_pred = expr_to_sase_predicate(right)?;
348 return Some(Predicate::And(Box::new(left_pred), Box::new(right_pred)));
349 }
350 BinOp::Or => {
351 let left_pred = expr_to_sase_predicate(left)?;
352 let right_pred = expr_to_sase_predicate(right)?;
353 return Some(Predicate::Or(Box::new(left_pred), Box::new(right_pred)));
354 }
355 _ => None,
356 }?;
357
358 if let (
361 Expr::Ident(field),
362 Expr::Member {
363 expr: ref_expr,
364 member: ref_field,
365 },
366 ) = (left.as_ref(), right.as_ref())
367 {
368 if let Expr::Ident(ref_alias) = ref_expr.as_ref() {
369 return Some(Predicate::CompareRef {
370 field: field.clone(),
371 op: compare_op,
372 ref_alias: ref_alias.clone(),
373 ref_field: ref_field.clone(),
374 });
375 }
376 }
377
378 let field = match left.as_ref() {
380 Expr::Ident(name) => name.clone(),
381 _ => {
382 return Some(Predicate::Expr(Box::new(expr.clone())));
384 }
385 };
386
387 if let Some(value) = expr_to_value(right) {
389 Some(Predicate::Compare {
390 field,
391 op: compare_op,
392 value,
393 })
394 } else {
395 Some(Predicate::Expr(Box::new(expr.clone())))
398 }
399 }
400
401 Expr::Unary {
403 op: UnaryOp::Not,
404 expr: inner,
405 } => {
406 let inner_pred = expr_to_sase_predicate(inner)?;
407 Some(Predicate::Not(Box::new(inner_pred)))
408 }
409
410 _ => Some(Predicate::Expr(Box::new(expr.clone()))),
412 }
413}
414
415pub fn compile_sase_pattern_expr(
417 expr: &varpulis_core::ast::SasePatternExpr,
418 within: Option<Duration>,
419) -> Option<SasePattern> {
420 use varpulis_core::ast::SasePatternExpr;
421
422 let pattern = match expr {
423 SasePatternExpr::Seq(items) => {
424 let steps: Vec<SasePattern> = items.iter().map(compile_sase_pattern_item).collect();
425 if steps.len() == 1 {
426 steps.into_iter().next().unwrap()
427 } else {
428 SasePattern::Seq(steps)
429 }
430 }
431 SasePatternExpr::And(left, right) => {
432 let l = compile_sase_pattern_expr(left, None)?;
433 let r = compile_sase_pattern_expr(right, None)?;
434 SasePattern::And(Box::new(l), Box::new(r))
435 }
436 SasePatternExpr::Or(left, right) => {
437 let l = compile_sase_pattern_expr(left, None)?;
438 let r = compile_sase_pattern_expr(right, None)?;
439 SasePattern::Or(Box::new(l), Box::new(r))
440 }
441 SasePatternExpr::Not(inner) => {
442 let i = compile_sase_pattern_expr(inner, None)?;
443 SasePattern::Not(Box::new(i))
444 }
445 SasePatternExpr::Event(name) => SasePattern::Event {
446 event_type: name.clone(),
447 predicate: None,
448 alias: None,
449 },
450 SasePatternExpr::Group(inner) => {
451 return compile_sase_pattern_expr(inner, within);
452 }
453 };
454
455 if let Some(duration) = within {
457 Some(SasePattern::Within(Box::new(pattern), duration))
458 } else {
459 Some(pattern)
460 }
461}
462
463fn compile_sase_pattern_item(item: &varpulis_core::ast::SasePatternItem) -> SasePattern {
465 let predicate = item.filter.as_ref().and_then(expr_to_sase_predicate);
466 let base = SasePattern::Event {
467 event_type: item.event_type.clone(),
468 predicate,
469 alias: item.alias.clone(),
470 };
471
472 match &item.kleene {
473 Some(varpulis_core::ast::KleeneOp::Plus) => SasePattern::KleenePlus(Box::new(base)),
474 Some(varpulis_core::ast::KleeneOp::Star) => SasePattern::KleeneStar(Box::new(base)),
475 Some(varpulis_core::ast::KleeneOp::Optional) => {
476 SasePattern::KleeneStar(Box::new(base))
478 }
479 None => base,
480 }
481}
482
483pub fn extract_event_types_from_pattern_expr(
485 expr: &varpulis_core::ast::SasePatternExpr,
486) -> Vec<String> {
487 use varpulis_core::ast::SasePatternExpr;
488
489 let mut types = Vec::new();
490 match expr {
491 SasePatternExpr::Seq(items) => {
492 for item in items {
493 if !types.contains(&item.event_type) {
494 types.push(item.event_type.clone());
495 }
496 }
497 }
498 SasePatternExpr::And(left, right) | SasePatternExpr::Or(left, right) => {
499 for t in extract_event_types_from_pattern_expr(left) {
500 if !types.contains(&t) {
501 types.push(t);
502 }
503 }
504 for t in extract_event_types_from_pattern_expr(right) {
505 if !types.contains(&t) {
506 types.push(t);
507 }
508 }
509 }
510 SasePatternExpr::Not(inner) | SasePatternExpr::Group(inner) => {
511 types = extract_event_types_from_pattern_expr(inner);
512 }
513 SasePatternExpr::Event(name) => {
514 types.push(name.clone());
515 }
516 }
517 types
518}
519
520fn expr_to_value(expr: &varpulis_core::ast::Expr) -> Option<varpulis_core::Value> {
522 use varpulis_core::ast::Expr;
523 use varpulis_core::Value;
524
525 match expr {
526 Expr::Int(n) => Some(Value::Int(*n)),
527 Expr::Float(f) => Some(Value::Float(*f)),
528 Expr::Str(s) => Some(Value::Str(s.clone().into())),
529 Expr::Bool(b) => Some(Value::Bool(*b)),
530 _ => None,
531 }
532}