Skip to main content

varpulis_runtime/engine/
compiler.rs

1//! Compilation functions for the Varpulis engine
2//!
3//! This module contains functions for converting VPL AST elements into
4//! runtime structures (aggregators, SASE+ patterns, sequence filters).
5
6use std::time::Duration;
7
8use tracing::warn;
9use varpulis_core::ast::{FollowedByClause, SequenceStepDecl, StreamSource};
10
11use crate::aggregation::{
12    AggBinOp, Avg, Count, CountDistinct, Ema, ExprAggregate, First, Last, Max, Median, Min,
13    Percentile, StdDev, Sum, P50, P95, P99,
14};
15use crate::sase::{CompareOp, Predicate, SasePattern};
16
17/// Compile an aggregate expression into an AggregateFunc
18pub fn compile_agg_expr(
19    expr: &varpulis_core::ast::Expr,
20) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
21    use varpulis_core::ast::{Arg, BinOp, Expr};
22
23    match expr {
24        // Simple function call: func(field) or func(field, param)
25        Expr::Call { func, args } => {
26            let func_name = match func.as_ref() {
27                Expr::Ident(s) => s.clone(),
28                _ => return None,
29            };
30
31            // Handle count(distinct(field)) pattern
32            if func_name == "count" {
33                if let Some(Arg::Positional(Expr::Call {
34                    func: inner_func,
35                    args: inner_args,
36                })) = args.first()
37                {
38                    if let Expr::Ident(inner_name) = inner_func.as_ref() {
39                        if inner_name == "distinct" {
40                            let field = inner_args.first().and_then(|a| match a {
41                                Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
42                                _ => None,
43                            });
44                            return Some((Box::new(CountDistinct), field));
45                        }
46                    }
47                }
48            }
49
50            let field = args.first().and_then(|a| match a {
51                Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
52                _ => None,
53            });
54
55            // Extract second argument as int (period for EMA) or float (quantile for percentile)
56            let second_int = args
57                .get(1)
58                .and_then(|a| match a {
59                    Arg::Positional(Expr::Int(n)) => Some(*n as usize),
60                    _ => None,
61                })
62                .unwrap_or(12);
63
64            let second_float = args.get(1).and_then(|a| match a {
65                Arg::Positional(Expr::Float(f)) => Some(*f),
66                _ => None,
67            });
68
69            let agg_func: Box<dyn crate::aggregation::AggregateFunc> = match func_name.as_str() {
70                "count" => Box::new(Count),
71                "sum" => Box::new(Sum),
72                "avg" => Box::new(Avg),
73                "min" => Box::new(Min),
74                "max" => Box::new(Max),
75                "last" => Box::new(Last),
76                "first" => Box::new(First),
77                "stddev" => Box::new(StdDev),
78                "ema" => Box::new(Ema::new(second_int)),
79                "count_distinct" => Box::new(CountDistinct),
80                "median" => Box::new(Median),
81                "p50" => Box::new(P50),
82                "p95" => Box::new(P95),
83                "p99" => Box::new(P99),
84                "percentile" => Box::new(Percentile::new(second_float.unwrap_or(0.5))),
85                other => {
86                    // Fallback: check UdfRegistry for custom aggregate UDFs
87                    // (checked by compile_agg_expr_with_udfs; standard path logs warning)
88                    warn!("Unknown aggregation function: {}", other);
89                    return None;
90                }
91            };
92
93            Some((agg_func, field))
94        }
95
96        // Binary expression: left op right (e.g., last(x) - ema(x, 9))
97        Expr::Binary { op, left, right } => {
98            let agg_op = match op {
99                BinOp::Add => AggBinOp::Add,
100                BinOp::Sub => AggBinOp::Sub,
101                BinOp::Mul => AggBinOp::Mul,
102                BinOp::Div => AggBinOp::Div,
103                _ => {
104                    warn!("Unsupported binary operator in aggregate: {:?}", op);
105                    return None;
106                }
107            };
108
109            let (left_func, left_field) = compile_agg_expr(left)?;
110            let (right_func, right_field) = compile_agg_expr(right)?;
111
112            let expr_agg =
113                ExprAggregate::new(left_func, left_field, agg_op, right_func, right_field);
114
115            Some((Box::new(expr_agg), None))
116        }
117
118        _ => {
119            warn!("Unsupported aggregate expression: {:?}", expr);
120            None
121        }
122    }
123}
124
125/// Compile an aggregate expression, checking the UDF registry for custom aggregates.
126///
127/// Falls through to [`compile_agg_expr`] for built-in functions. When a name
128/// is not recognized as a built-in but exists in the UDF registry as an aggregate,
129/// it is wrapped in an adapter that delegates to the [`Accumulator`](crate::udf::Accumulator).
130pub fn compile_agg_expr_with_udfs(
131    expr: &varpulis_core::ast::Expr,
132    udf_registry: &crate::udf::UdfRegistry,
133) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
134    // Try built-in first
135    if let Some(result) = compile_agg_expr(expr) {
136        return Some(result);
137    }
138
139    // Fallback: check UDF registry for custom aggregate
140    use varpulis_core::ast::{Arg, Expr};
141    if let Expr::Call { func, args } = expr {
142        if let Expr::Ident(func_name) = func.as_ref() {
143            if let Some(agg_udf) = udf_registry.get_aggregate(func_name) {
144                let field = args.first().and_then(|a| match a {
145                    Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
146                    _ => None,
147                });
148
149                let adapter = UdfAggregateAdapter {
150                    udf: agg_udf.clone(),
151                };
152                return Some((Box::new(adapter), field));
153            }
154        }
155    }
156
157    None
158}
159
160/// Adapter that wraps a UDF [`Accumulator`](crate::udf::Accumulator) as an [`AggregateFunc`](crate::aggregation::AggregateFunc).
161struct UdfAggregateAdapter {
162    udf: std::sync::Arc<dyn crate::udf::AggregateUDF>,
163}
164
165impl crate::aggregation::AggregateFunc for UdfAggregateAdapter {
166    fn name(&self) -> &'static str {
167        "udf_aggregate"
168    }
169
170    fn apply(&self, events: &[crate::event::Event], field: Option<&str>) -> varpulis_core::Value {
171        let mut acc = self.udf.init();
172        let field_name = field.unwrap_or("value");
173        for event in events {
174            if let Some(val) = event.get(field_name) {
175                acc.update(val);
176            }
177        }
178        acc.finish()
179    }
180}
181
182// =============================================================================
183// SASE+ Pattern Compilation
184// =============================================================================
185
186/// Information about a derived stream for pattern compilation
187#[derive(Debug, Clone)]
188pub struct DerivedStreamInfo {
189    /// The underlying event type (e.g., "Transaction")
190    pub event_type: String,
191    /// Optional filter expression from the stream definition
192    pub filter: Option<varpulis_core::ast::Expr>,
193}
194
195/// Stream resolver function type: given a stream name, returns derived stream info if found
196pub type StreamResolver<'a> = &'a dyn Fn(&str) -> Option<DerivedStreamInfo>;
197
198/// Compile a sequence source and operations into a SASE+ pattern with stream resolution
199pub fn compile_to_sase_pattern_with_resolver(
200    source: &StreamSource,
201    followed_by_clauses: &[FollowedByClause],
202    _negation_clauses: &[FollowedByClause],
203    within_duration: Option<Duration>,
204    stream_resolver: StreamResolver,
205) -> Option<SasePattern> {
206    let mut steps: Vec<SasePattern> = Vec::new();
207
208    // Handle source
209    match source {
210        StreamSource::Sequence(decl) => {
211            // sequence() construct with explicit steps
212            for step in &decl.steps {
213                let pattern = compile_sequence_step_to_sase(step);
214                steps.push(pattern);
215            }
216        }
217        StreamSource::Ident(name) => {
218            // Check if this is a derived stream
219            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
220                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
221                (info.event_type, pred)
222            } else {
223                (name.clone(), None)
224            };
225            steps.push(SasePattern::Event {
226                event_type,
227                predicate,
228                alias: None,
229            });
230        }
231        StreamSource::IdentWithAlias { name, alias } => {
232            // Check if this is a derived stream
233            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
234                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
235                (info.event_type, pred)
236            } else {
237                (name.clone(), None)
238            };
239            steps.push(SasePattern::Event {
240                event_type,
241                predicate,
242                alias: Some(alias.clone()),
243            });
244        }
245        StreamSource::AllWithAlias { name, alias } => {
246            // Check if this is a derived stream
247            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
248                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
249                (info.event_type, pred)
250            } else {
251                (name.clone(), None)
252            };
253            // match_all -> Kleene+
254            let event_pattern = SasePattern::Event {
255                event_type,
256                predicate,
257                alias: alias.clone(),
258            };
259            steps.push(SasePattern::KleenePlus(Box::new(event_pattern)));
260        }
261        _ => return None,
262    }
263
264    // Add followed_by clauses
265    for clause in followed_by_clauses {
266        // Check if event_type is a derived stream
267        let (resolved_event_type, stream_predicate) =
268            if let Some(info) = stream_resolver(&clause.event_type) {
269                (info.event_type, info.filter)
270            } else {
271                (clause.event_type.clone(), None)
272            };
273
274        // Combine stream filter with clause filter
275        let clause_predicate = clause.filter.as_ref().and_then(expr_to_sase_predicate);
276        let stream_pred = stream_predicate.as_ref().and_then(expr_to_sase_predicate);
277
278        let predicate = match (stream_pred, clause_predicate) {
279            (Some(sp), Some(cp)) => Some(Predicate::And(Box::new(sp), Box::new(cp))),
280            (Some(sp), None) => Some(sp),
281            (None, Some(cp)) => Some(cp),
282            (None, None) => None,
283        };
284
285        let event_pattern = SasePattern::Event {
286            event_type: resolved_event_type,
287            predicate,
288            alias: clause.alias.clone(),
289        };
290
291        // Handle match_all
292        let pattern = if clause.match_all {
293            SasePattern::KleenePlus(Box::new(event_pattern))
294        } else {
295            event_pattern
296        };
297
298        steps.push(pattern);
299    }
300
301    // Build the final pattern
302    if steps.is_empty() {
303        return None;
304    }
305
306    let pattern = if steps.len() == 1 {
307        // Safe: we just checked steps is not empty
308        steps.pop()?
309    } else {
310        SasePattern::Seq(steps)
311    };
312
313    // Apply within constraint if specified
314    match within_duration {
315        Some(duration) => Some(SasePattern::Within(Box::new(pattern), duration)),
316        None => Some(pattern),
317    }
318}
319
320/// Compile a sequence step declaration to a SASE pattern
321fn compile_sequence_step_to_sase(step: &SequenceStepDecl) -> SasePattern {
322    let predicate = step.filter.as_ref().and_then(expr_to_sase_predicate);
323
324    SasePattern::Event {
325        event_type: step.event_type.clone(),
326        predicate,
327        alias: Some(step.alias.clone()),
328    }
329}
330
331/// Convert a VPL expression to a SASE predicate
332pub fn expr_to_sase_predicate(expr: &varpulis_core::ast::Expr) -> Option<Predicate> {
333    use varpulis_core::ast::{BinOp, Expr, UnaryOp};
334
335    match expr {
336        // Binary comparison: field == value
337        Expr::Binary { op, left, right } => {
338            let compare_op = match op {
339                BinOp::Eq => Some(CompareOp::Eq),
340                BinOp::NotEq => Some(CompareOp::NotEq),
341                BinOp::Lt => Some(CompareOp::Lt),
342                BinOp::Le => Some(CompareOp::Le),
343                BinOp::Gt => Some(CompareOp::Gt),
344                BinOp::Ge => Some(CompareOp::Ge),
345                BinOp::And => {
346                    let left_pred = expr_to_sase_predicate(left)?;
347                    let right_pred = expr_to_sase_predicate(right)?;
348                    return Some(Predicate::And(Box::new(left_pred), Box::new(right_pred)));
349                }
350                BinOp::Or => {
351                    let left_pred = expr_to_sase_predicate(left)?;
352                    let right_pred = expr_to_sase_predicate(right)?;
353                    return Some(Predicate::Or(Box::new(left_pred), Box::new(right_pred)));
354                }
355                _ => None,
356            }?;
357
358            // Handle cross-event reference comparisons (e.g., order_id == order.id)
359            // Left: current event field, Right: reference to captured event
360            if let (
361                Expr::Ident(field),
362                Expr::Member {
363                    expr: ref_expr,
364                    member: ref_field,
365                },
366            ) = (left.as_ref(), right.as_ref())
367            {
368                if let Expr::Ident(ref_alias) = ref_expr.as_ref() {
369                    return Some(Predicate::CompareRef {
370                        field: field.clone(),
371                        op: compare_op,
372                        ref_alias: ref_alias.clone(),
373                        ref_field: ref_field.clone(),
374                    });
375                }
376            }
377
378            // Extract field name from left side for simple comparisons
379            let field = match left.as_ref() {
380                Expr::Ident(name) => name.clone(),
381                _ => {
382                    // Fall back to runtime expression evaluation for complex left-side
383                    return Some(Predicate::Expr(Box::new(expr.clone())));
384                }
385            };
386
387            // Extract value from right side
388            if let Some(value) = expr_to_value(right) {
389                Some(Predicate::Compare {
390                    field,
391                    op: compare_op,
392                    value,
393                })
394            } else {
395                // Right side is complex (e.g., another field or expression)
396                // Fall back to runtime expression evaluation
397                Some(Predicate::Expr(Box::new(expr.clone())))
398            }
399        }
400
401        // Unary not
402        Expr::Unary {
403            op: UnaryOp::Not,
404            expr: inner,
405        } => {
406            let inner_pred = expr_to_sase_predicate(inner)?;
407            Some(Predicate::Not(Box::new(inner_pred)))
408        }
409
410        // Fall back to storing the expression for runtime evaluation
411        _ => Some(Predicate::Expr(Box::new(expr.clone()))),
412    }
413}
414
415/// Compile a `SasePatternExpr` (from a named `pattern` declaration) into a runtime `SasePattern`.
416pub fn compile_sase_pattern_expr(
417    expr: &varpulis_core::ast::SasePatternExpr,
418    within: Option<Duration>,
419) -> Option<SasePattern> {
420    use varpulis_core::ast::SasePatternExpr;
421
422    let pattern = match expr {
423        SasePatternExpr::Seq(items) => {
424            let steps: Vec<SasePattern> = items.iter().map(compile_sase_pattern_item).collect();
425            if steps.len() == 1 {
426                steps.into_iter().next().unwrap()
427            } else {
428                SasePattern::Seq(steps)
429            }
430        }
431        SasePatternExpr::And(left, right) => {
432            let l = compile_sase_pattern_expr(left, None)?;
433            let r = compile_sase_pattern_expr(right, None)?;
434            SasePattern::And(Box::new(l), Box::new(r))
435        }
436        SasePatternExpr::Or(left, right) => {
437            let l = compile_sase_pattern_expr(left, None)?;
438            let r = compile_sase_pattern_expr(right, None)?;
439            SasePattern::Or(Box::new(l), Box::new(r))
440        }
441        SasePatternExpr::Not(inner) => {
442            let i = compile_sase_pattern_expr(inner, None)?;
443            SasePattern::Not(Box::new(i))
444        }
445        SasePatternExpr::Event(name) => SasePattern::Event {
446            event_type: name.clone(),
447            predicate: None,
448            alias: None,
449        },
450        SasePatternExpr::Group(inner) => {
451            return compile_sase_pattern_expr(inner, within);
452        }
453    };
454
455    // Wrap with Within if specified
456    if let Some(duration) = within {
457        Some(SasePattern::Within(Box::new(pattern), duration))
458    } else {
459        Some(pattern)
460    }
461}
462
463/// Compile a single `SasePatternItem` to a `SasePattern`, handling Kleene operators.
464fn compile_sase_pattern_item(item: &varpulis_core::ast::SasePatternItem) -> SasePattern {
465    let predicate = item.filter.as_ref().and_then(expr_to_sase_predicate);
466    let base = SasePattern::Event {
467        event_type: item.event_type.clone(),
468        predicate,
469        alias: item.alias.clone(),
470    };
471
472    match &item.kleene {
473        Some(varpulis_core::ast::KleeneOp::Plus) => SasePattern::KleenePlus(Box::new(base)),
474        Some(varpulis_core::ast::KleeneOp::Star) => SasePattern::KleeneStar(Box::new(base)),
475        Some(varpulis_core::ast::KleeneOp::Optional) => {
476            // Optional is equivalent to Or(base, empty match) — use KleeneStar for now
477            SasePattern::KleeneStar(Box::new(base))
478        }
479        None => base,
480    }
481}
482
483/// Extract all event type names from a `SasePatternExpr`.
484pub fn extract_event_types_from_pattern_expr(
485    expr: &varpulis_core::ast::SasePatternExpr,
486) -> Vec<String> {
487    use varpulis_core::ast::SasePatternExpr;
488
489    let mut types = Vec::new();
490    match expr {
491        SasePatternExpr::Seq(items) => {
492            for item in items {
493                if !types.contains(&item.event_type) {
494                    types.push(item.event_type.clone());
495                }
496            }
497        }
498        SasePatternExpr::And(left, right) | SasePatternExpr::Or(left, right) => {
499            for t in extract_event_types_from_pattern_expr(left) {
500                if !types.contains(&t) {
501                    types.push(t);
502                }
503            }
504            for t in extract_event_types_from_pattern_expr(right) {
505                if !types.contains(&t) {
506                    types.push(t);
507                }
508            }
509        }
510        SasePatternExpr::Not(inner) | SasePatternExpr::Group(inner) => {
511            types = extract_event_types_from_pattern_expr(inner);
512        }
513        SasePatternExpr::Event(name) => {
514            types.push(name.clone());
515        }
516    }
517    types
518}
519
520/// Convert an AST expression to a Value (for predicates)
521fn expr_to_value(expr: &varpulis_core::ast::Expr) -> Option<varpulis_core::Value> {
522    use varpulis_core::ast::Expr;
523    use varpulis_core::Value;
524
525    match expr {
526        Expr::Int(n) => Some(Value::Int(*n)),
527        Expr::Float(f) => Some(Value::Float(*f)),
528        Expr::Str(s) => Some(Value::Str(s.clone().into())),
529        Expr::Bool(b) => Some(Value::Bool(*b)),
530        _ => None,
531    }
532}