Skip to main content

varpulis_runtime/engine/
compiler.rs

1//! Compilation functions for the Varpulis engine
2//!
3//! This module contains functions for converting VPL AST elements into
4//! runtime structures (aggregators, SASE+ patterns, sequence filters).
5
6use std::time::Duration;
7
8use tracing::warn;
9use varpulis_core::ast::{FollowedByClause, SequenceStepDecl, StreamSource};
10
11use crate::aggregation::{
12    AggBinOp, Avg, Count, CountDistinct, Ema, ExprAggregate, First, Last, Max, Median, Min,
13    Percentile, StdDev, Sum, P50, P95, P99,
14};
15use crate::sase::{CompareOp, Predicate, SasePattern};
16
17/// Compile an aggregate expression into an AggregateFunc
18pub fn compile_agg_expr(
19    expr: &varpulis_core::ast::Expr,
20) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
21    use varpulis_core::ast::{Arg, BinOp, Expr};
22
23    match expr {
24        // Simple function call: func(field) or func(field, param)
25        Expr::Call { func, args } => {
26            let func_name = match func.as_ref() {
27                Expr::Ident(s) => s.clone(),
28                _ => return None,
29            };
30
31            // Handle count(distinct(field)) pattern
32            if func_name == "count" {
33                if let Some(Arg::Positional(Expr::Call {
34                    func: inner_func,
35                    args: inner_args,
36                })) = args.first()
37                {
38                    if let Expr::Ident(inner_name) = inner_func.as_ref() {
39                        if inner_name == "distinct" {
40                            let field = inner_args.first().and_then(|a| match a {
41                                Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
42                                _ => None,
43                            });
44                            return Some((Box::new(CountDistinct), field));
45                        }
46                    }
47                }
48            }
49
50            let field = args.first().and_then(|a| match a {
51                Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
52                _ => None,
53            });
54
55            // Extract second argument as int (period for EMA) or float (quantile for percentile)
56            let second_int = args
57                .get(1)
58                .and_then(|a| match a {
59                    Arg::Positional(Expr::Int(n)) => Some(*n as usize),
60                    _ => None,
61                })
62                .unwrap_or(12);
63
64            let second_float = args.get(1).and_then(|a| match a {
65                Arg::Positional(Expr::Float(f)) => Some(*f),
66                _ => None,
67            });
68
69            let agg_func: Box<dyn crate::aggregation::AggregateFunc> = match func_name.as_str() {
70                "count" => Box::new(Count),
71                "sum" => Box::new(Sum),
72                "avg" => Box::new(Avg),
73                "min" => Box::new(Min),
74                "max" => Box::new(Max),
75                "last" => Box::new(Last),
76                "first" => Box::new(First),
77                "stddev" => Box::new(StdDev),
78                "ema" => Box::new(Ema::new(second_int)),
79                "count_distinct" => Box::new(CountDistinct),
80                "median" => Box::new(Median),
81                "p50" => Box::new(P50),
82                "p95" => Box::new(P95),
83                "p99" => Box::new(P99),
84                "percentile" => Box::new(Percentile::new(second_float.unwrap_or(0.5))),
85                other => {
86                    // Fallback: check UdfRegistry for custom aggregate UDFs
87                    // (checked by compile_agg_expr_with_udfs; standard path logs warning)
88                    warn!("Unknown aggregation function: {}", other);
89                    return None;
90                }
91            };
92
93            Some((agg_func, field))
94        }
95
96        // Binary expression: left op right (e.g., last(x) - ema(x, 9))
97        Expr::Binary { op, left, right } => {
98            let agg_op = match op {
99                BinOp::Add => AggBinOp::Add,
100                BinOp::Sub => AggBinOp::Sub,
101                BinOp::Mul => AggBinOp::Mul,
102                BinOp::Div => AggBinOp::Div,
103                _ => {
104                    warn!("Unsupported binary operator in aggregate: {:?}", op);
105                    return None;
106                }
107            };
108
109            let (left_func, left_field) = compile_agg_expr(left)?;
110            let (right_func, right_field) = compile_agg_expr(right)?;
111
112            let expr_agg =
113                ExprAggregate::new(left_func, left_field, agg_op, right_func, right_field);
114
115            Some((Box::new(expr_agg), None))
116        }
117
118        _ => {
119            warn!("Unsupported aggregate expression: {:?}", expr);
120            None
121        }
122    }
123}
124
125/// Compile an aggregate expression, checking the UDF registry for custom aggregates.
126///
127/// Falls through to [`compile_agg_expr`] for built-in functions. When a name
128/// is not recognized as a built-in but exists in the UDF registry as an aggregate,
129/// it is wrapped in an adapter that delegates to the [`Accumulator`](crate::udf::Accumulator).
130pub fn compile_agg_expr_with_udfs(
131    expr: &varpulis_core::ast::Expr,
132    udf_registry: &crate::udf::UdfRegistry,
133) -> Option<(Box<dyn crate::aggregation::AggregateFunc>, Option<String>)> {
134    // Try built-in first
135    if let Some(result) = compile_agg_expr(expr) {
136        return Some(result);
137    }
138
139    // Fallback: check UDF registry for custom aggregate
140    use varpulis_core::ast::{Arg, Expr};
141    if let Expr::Call { func, args } = expr {
142        if let Expr::Ident(func_name) = func.as_ref() {
143            if let Some(agg_udf) = udf_registry.get_aggregate(func_name) {
144                let field = args.first().and_then(|a| match a {
145                    Arg::Positional(Expr::Ident(s)) => Some(s.clone()),
146                    _ => None,
147                });
148
149                let adapter = UdfAggregateAdapter {
150                    udf: agg_udf.clone(),
151                };
152                return Some((Box::new(adapter), field));
153            }
154        }
155    }
156
157    None
158}
159
160/// Adapter that wraps a UDF [`Accumulator`](crate::udf::Accumulator) as an [`AggregateFunc`](crate::aggregation::AggregateFunc).
161struct UdfAggregateAdapter {
162    udf: std::sync::Arc<dyn crate::udf::AggregateUDF>,
163}
164
165impl crate::aggregation::AggregateFunc for UdfAggregateAdapter {
166    fn name(&self) -> &'static str {
167        "udf_aggregate"
168    }
169
170    fn apply(&self, events: &[crate::event::Event], field: Option<&str>) -> varpulis_core::Value {
171        let mut acc = self.udf.init();
172        let field_name = field.unwrap_or("value");
173        for event in events {
174            if let Some(val) = event.get(field_name) {
175                acc.update(val);
176            }
177        }
178        acc.finish()
179    }
180}
181
182// =============================================================================
183// SASE+ Pattern Compilation
184// =============================================================================
185
186/// Information about a derived stream for pattern compilation
187#[derive(Debug, Clone)]
188pub struct DerivedStreamInfo {
189    /// The underlying event type (e.g., "Transaction")
190    pub event_type: String,
191    /// Optional filter expression from the stream definition
192    pub filter: Option<varpulis_core::ast::Expr>,
193}
194
195/// Stream resolver function type: given a stream name, returns derived stream info if found
196pub type StreamResolver<'a> = &'a dyn Fn(&str) -> Option<DerivedStreamInfo>;
197
198/// Compile a sequence source and operations into a SASE+ pattern with stream resolution
199pub fn compile_to_sase_pattern_with_resolver(
200    source: &StreamSource,
201    followed_by_clauses: &[FollowedByClause],
202    _negation_clauses: &[FollowedByClause],
203    within_duration: Option<Duration>,
204    stream_resolver: StreamResolver,
205) -> Option<SasePattern> {
206    let mut steps: Vec<SasePattern> = Vec::new();
207
208    // Handle source
209    match source {
210        StreamSource::Sequence(decl) => {
211            // sequence() construct with explicit steps
212            for step in &decl.steps {
213                let pattern = compile_sequence_step_to_sase(step);
214                steps.push(pattern);
215            }
216        }
217        StreamSource::Ident(name) => {
218            // Check if this is a derived stream
219            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
220                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
221                (info.event_type, pred)
222            } else {
223                (name.clone(), None)
224            };
225            steps.push(SasePattern::Event {
226                event_type,
227                predicate,
228                alias: None,
229            });
230        }
231        StreamSource::IdentWithAlias { name, alias } => {
232            // Check if this is a derived stream
233            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
234                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
235                (info.event_type, pred)
236            } else {
237                (name.clone(), None)
238            };
239            steps.push(SasePattern::Event {
240                event_type,
241                predicate,
242                alias: Some(alias.clone()),
243            });
244        }
245        StreamSource::IdentWithFilterAndAlias {
246            name,
247            filter,
248            alias,
249        } => {
250            // Inline filter becomes a SASE predicate
251            let (event_type, mut predicate) = if let Some(info) = stream_resolver(name) {
252                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
253                (info.event_type, pred)
254            } else {
255                (name.clone(), None)
256            };
257            // The inline filter takes precedence / merges with derived stream filter
258            if let Some(inline_pred) = expr_to_sase_predicate(filter) {
259                predicate = Some(inline_pred);
260            }
261            steps.push(SasePattern::Event {
262                event_type,
263                predicate,
264                alias: alias.clone(),
265            });
266        }
267        StreamSource::AllWithAlias { name, alias } => {
268            // Check if this is a derived stream
269            let (event_type, predicate) = if let Some(info) = stream_resolver(name) {
270                let pred = info.filter.as_ref().and_then(expr_to_sase_predicate);
271                (info.event_type, pred)
272            } else {
273                (name.clone(), None)
274            };
275            // match_all -> Kleene+
276            let event_pattern = SasePattern::Event {
277                event_type,
278                predicate,
279                alias: alias.clone(),
280            };
281            steps.push(SasePattern::KleenePlus(Box::new(event_pattern)));
282        }
283        _ => return None,
284    }
285
286    // Add followed_by clauses
287    for clause in followed_by_clauses {
288        // Check if event_type is a derived stream
289        let (resolved_event_type, stream_predicate) =
290            if let Some(info) = stream_resolver(&clause.event_type) {
291                (info.event_type, info.filter)
292            } else {
293                (clause.event_type.clone(), None)
294            };
295
296        // Combine stream filter with clause filter
297        let clause_predicate = clause.filter.as_ref().and_then(expr_to_sase_predicate);
298        let stream_pred = stream_predicate.as_ref().and_then(expr_to_sase_predicate);
299
300        let predicate = match (stream_pred, clause_predicate) {
301            (Some(sp), Some(cp)) => Some(Predicate::And(Box::new(sp), Box::new(cp))),
302            (Some(sp), None) => Some(sp),
303            (None, Some(cp)) => Some(cp),
304            (None, None) => None,
305        };
306
307        let event_pattern = SasePattern::Event {
308            event_type: resolved_event_type,
309            predicate,
310            alias: clause.alias.clone(),
311        };
312
313        // Handle match_all
314        let pattern = if clause.match_all {
315            SasePattern::KleenePlus(Box::new(event_pattern))
316        } else {
317            event_pattern
318        };
319
320        steps.push(pattern);
321    }
322
323    // Build the final pattern
324    if steps.is_empty() {
325        return None;
326    }
327
328    let pattern = if steps.len() == 1 {
329        // Safe: we just checked steps is not empty
330        steps.pop()?
331    } else {
332        SasePattern::Seq(steps)
333    };
334
335    // Apply within constraint if specified
336    match within_duration {
337        Some(duration) => Some(SasePattern::Within(Box::new(pattern), duration)),
338        None => Some(pattern),
339    }
340}
341
342/// Compile a sequence step declaration to a SASE pattern
343fn compile_sequence_step_to_sase(step: &SequenceStepDecl) -> SasePattern {
344    let predicate = step.filter.as_ref().and_then(expr_to_sase_predicate);
345
346    SasePattern::Event {
347        event_type: step.event_type.clone(),
348        predicate,
349        alias: Some(step.alias.clone()),
350    }
351}
352
353/// Convert a VPL expression to a SASE predicate
354pub fn expr_to_sase_predicate(expr: &varpulis_core::ast::Expr) -> Option<Predicate> {
355    use varpulis_core::ast::{BinOp, Expr, UnaryOp};
356
357    match expr {
358        // Binary comparison: field == value
359        Expr::Binary { op, left, right } => {
360            let compare_op = match op {
361                BinOp::Eq => Some(CompareOp::Eq),
362                BinOp::NotEq => Some(CompareOp::NotEq),
363                BinOp::Lt => Some(CompareOp::Lt),
364                BinOp::Le => Some(CompareOp::Le),
365                BinOp::Gt => Some(CompareOp::Gt),
366                BinOp::Ge => Some(CompareOp::Ge),
367                BinOp::And => {
368                    let left_pred = expr_to_sase_predicate(left)?;
369                    let right_pred = expr_to_sase_predicate(right)?;
370                    return Some(Predicate::And(Box::new(left_pred), Box::new(right_pred)));
371                }
372                BinOp::Or => {
373                    let left_pred = expr_to_sase_predicate(left)?;
374                    let right_pred = expr_to_sase_predicate(right)?;
375                    return Some(Predicate::Or(Box::new(left_pred), Box::new(right_pred)));
376                }
377                _ => None,
378            }?;
379
380            // Handle cross-event reference comparisons (e.g., order_id == order.id)
381            // Left: current event field, Right: reference to captured event
382            if let (
383                Expr::Ident(field),
384                Expr::Member {
385                    expr: ref_expr,
386                    member: ref_field,
387                },
388            ) = (left.as_ref(), right.as_ref())
389            {
390                if let Expr::Ident(ref_alias) = ref_expr.as_ref() {
391                    return Some(Predicate::CompareRef {
392                        field: field.clone(),
393                        op: compare_op,
394                        ref_alias: ref_alias.clone(),
395                        ref_field: ref_field.clone(),
396                    });
397                }
398            }
399
400            // Extract field name from left side for simple comparisons
401            let field = match left.as_ref() {
402                Expr::Ident(name) => name.clone(),
403                _ => {
404                    // Fall back to runtime expression evaluation for complex left-side
405                    return Some(Predicate::Expr(Box::new(expr.clone())));
406                }
407            };
408
409            // Extract value from right side
410            if let Some(value) = expr_to_value(right) {
411                Some(Predicate::Compare {
412                    field,
413                    op: compare_op,
414                    value,
415                })
416            } else {
417                // Right side is complex (e.g., another field or expression)
418                // Fall back to runtime expression evaluation
419                Some(Predicate::Expr(Box::new(expr.clone())))
420            }
421        }
422
423        // Unary not
424        Expr::Unary {
425            op: UnaryOp::Not,
426            expr: inner,
427        } => {
428            let inner_pred = expr_to_sase_predicate(inner)?;
429            Some(Predicate::Not(Box::new(inner_pred)))
430        }
431
432        // Fall back to storing the expression for runtime evaluation
433        _ => Some(Predicate::Expr(Box::new(expr.clone()))),
434    }
435}
436
437/// Compile a `SasePatternExpr` (from a named `pattern` declaration) into a runtime `SasePattern`.
438pub fn compile_sase_pattern_expr(
439    expr: &varpulis_core::ast::SasePatternExpr,
440    within: Option<Duration>,
441) -> Option<SasePattern> {
442    use varpulis_core::ast::SasePatternExpr;
443
444    let pattern = match expr {
445        SasePatternExpr::Seq(items) => {
446            let steps: Vec<SasePattern> = items.iter().map(compile_sase_pattern_item).collect();
447            if steps.len() == 1 {
448                steps.into_iter().next().unwrap()
449            } else {
450                SasePattern::Seq(steps)
451            }
452        }
453        SasePatternExpr::And(left, right) => {
454            let l = compile_sase_pattern_expr(left, None)?;
455            let r = compile_sase_pattern_expr(right, None)?;
456            SasePattern::And(Box::new(l), Box::new(r))
457        }
458        SasePatternExpr::Or(left, right) => {
459            let l = compile_sase_pattern_expr(left, None)?;
460            let r = compile_sase_pattern_expr(right, None)?;
461            SasePattern::Or(Box::new(l), Box::new(r))
462        }
463        SasePatternExpr::Not(inner) => {
464            let i = compile_sase_pattern_expr(inner, None)?;
465            SasePattern::Not(Box::new(i))
466        }
467        SasePatternExpr::Event(name) => SasePattern::Event {
468            event_type: name.clone(),
469            predicate: None,
470            alias: None,
471        },
472        SasePatternExpr::Group(inner) => {
473            return compile_sase_pattern_expr(inner, within);
474        }
475    };
476
477    // Wrap with Within if specified
478    if let Some(duration) = within {
479        Some(SasePattern::Within(Box::new(pattern), duration))
480    } else {
481        Some(pattern)
482    }
483}
484
485/// Compile a single `SasePatternItem` to a `SasePattern`, handling Kleene operators.
486fn compile_sase_pattern_item(item: &varpulis_core::ast::SasePatternItem) -> SasePattern {
487    let predicate = item.filter.as_ref().and_then(expr_to_sase_predicate);
488    let base = SasePattern::Event {
489        event_type: item.event_type.clone(),
490        predicate,
491        alias: item.alias.clone(),
492    };
493
494    match &item.kleene {
495        Some(varpulis_core::ast::KleeneOp::Plus) => SasePattern::KleenePlus(Box::new(base)),
496        Some(varpulis_core::ast::KleeneOp::Star) => SasePattern::KleeneStar(Box::new(base)),
497        Some(varpulis_core::ast::KleeneOp::Optional) => {
498            // Optional is equivalent to Or(base, empty match) — use KleeneStar for now
499            SasePattern::KleeneStar(Box::new(base))
500        }
501        None => base,
502    }
503}
504
505/// Extract all event type names from a `SasePatternExpr`.
506pub fn extract_event_types_from_pattern_expr(
507    expr: &varpulis_core::ast::SasePatternExpr,
508) -> Vec<String> {
509    use varpulis_core::ast::SasePatternExpr;
510
511    let mut types = Vec::new();
512    match expr {
513        SasePatternExpr::Seq(items) => {
514            for item in items {
515                if !types.contains(&item.event_type) {
516                    types.push(item.event_type.clone());
517                }
518            }
519        }
520        SasePatternExpr::And(left, right) | SasePatternExpr::Or(left, right) => {
521            for t in extract_event_types_from_pattern_expr(left) {
522                if !types.contains(&t) {
523                    types.push(t);
524                }
525            }
526            for t in extract_event_types_from_pattern_expr(right) {
527                if !types.contains(&t) {
528                    types.push(t);
529                }
530            }
531        }
532        SasePatternExpr::Not(inner) | SasePatternExpr::Group(inner) => {
533            types = extract_event_types_from_pattern_expr(inner);
534        }
535        SasePatternExpr::Event(name) => {
536            types.push(name.clone());
537        }
538    }
539    types
540}
541
542/// Convert an AST expression to a Value (for predicates)
543fn expr_to_value(expr: &varpulis_core::ast::Expr) -> Option<varpulis_core::Value> {
544    use varpulis_core::ast::Expr;
545    use varpulis_core::Value;
546
547    match expr {
548        Expr::Int(n) => Some(Value::Int(*n)),
549        Expr::Float(f) => Some(Value::Float(*f)),
550        Expr::Str(s) => Some(Value::Str(s.clone().into())),
551        Expr::Bool(b) => Some(Value::Bool(*b)),
552        _ => None,
553    }
554}