Skip to main content

radiate_core/stats/expression/
builder.rs

1use super::{
2    Expr, MetricField, MetricKind,
3    aggregate::{AggExpr, Rollup},
4    ops::{BinaryExpr, BinaryOp, TrinaryExpr, TrinaryOp, UnaryExpr, UnaryOp, fuse_affine},
5};
6use radiate_utils::{DataType, Quantile};
7use std::ops::{Add, Div, Mul, Neg, Not, Sub};
8
9impl Expr {
10    pub fn time(mut self) -> Expr {
11        self.try_swap_select_kind(MetricKind::Duration);
12        self
13    }
14
15    pub fn value(mut self) -> Expr {
16        self.try_swap_select_kind(MetricKind::Value);
17        self
18    }
19
20    pub fn debug(self) -> Expr {
21        Expr::Unary(UnaryExpr::new(self, UnaryOp::Debug))
22    }
23
24    pub fn rolling(self, window_size: usize) -> Expr {
25        match self {
26            Expr::Aggregate(agg) => {
27                Expr::Aggregate(AggExpr::new(*agg.child, agg.rollup).rolling(window_size))
28            }
29            Expr::Selector(select) => Expr::Aggregate(
30                AggExpr::new(Expr::Selector(select), Rollup::Last).rolling(window_size),
31            ),
32            _ => Expr::Aggregate(AggExpr::new(self, Rollup::Last).rolling(window_size)),
33        }
34    }
35
36    pub fn first(self) -> Expr {
37        self.try_reduce_select_agg_rollup_or(MetricField::LastValue, Rollup::First, |expr| {
38            Expr::Aggregate(AggExpr::new(expr, Rollup::First))
39        })
40    }
41
42    pub fn last(self) -> Expr {
43        self.try_reduce_select_agg_rollup_or(MetricField::LastValue, Rollup::Last, |expr| {
44            Expr::Aggregate(AggExpr::new(expr, Rollup::Last))
45        })
46    }
47
48    pub fn sum(self) -> Expr {
49        self.try_reduce_select_agg_rollup_or(MetricField::Sum, Rollup::Sum, |expr| {
50            Expr::Aggregate(AggExpr::new(expr, Rollup::Sum))
51        })
52    }
53
54    pub fn mean(self) -> Expr {
55        self.try_reduce_select_agg_rollup_or(MetricField::Mean, Rollup::Mean, |expr| {
56            Expr::Aggregate(AggExpr::new(expr, Rollup::Mean))
57        })
58    }
59
60    pub fn stddev(self) -> Expr {
61        self.try_reduce_select_agg_rollup_or(MetricField::StdDev, Rollup::StdDev, |expr| {
62            Expr::Aggregate(AggExpr::new(expr, Rollup::StdDev))
63        })
64    }
65
66    pub fn min(self) -> Expr {
67        self.try_reduce_select_agg_rollup_or(MetricField::Min, Rollup::Min, |expr| {
68            Expr::Aggregate(AggExpr::new(expr, Rollup::Min))
69        })
70    }
71
72    pub fn max(self) -> Expr {
73        self.try_reduce_select_agg_rollup_or(MetricField::Max, Rollup::Max, |expr| {
74            Expr::Aggregate(AggExpr::new(expr, Rollup::Max))
75        })
76    }
77
78    pub fn var(self) -> Expr {
79        self.try_reduce_select_agg_rollup_or(MetricField::Var, Rollup::Var, |expr| {
80            Expr::Aggregate(AggExpr::new(expr, Rollup::Var))
81        })
82    }
83
84    pub fn skew(self) -> Expr {
85        self.try_reduce_select_agg_rollup_or(MetricField::Skew, Rollup::Skew, |expr| {
86            Expr::Aggregate(AggExpr::new(expr, Rollup::Skew))
87        })
88    }
89
90    pub fn count(self) -> Expr {
91        self.try_reduce_select_agg_rollup_or(MetricField::Count, Rollup::Count, |expr| {
92            Expr::Aggregate(AggExpr::new(expr, Rollup::Count))
93        })
94    }
95
96    pub fn slope(self) -> Expr {
97        self.try_swap_agg_rollup_or(Rollup::Slope, |expr| {
98            Expr::Aggregate(AggExpr::new(expr, Rollup::Slope))
99        })
100    }
101
102    pub fn unique(self) -> Expr {
103        self.try_swap_agg_rollup_or(Rollup::Unique, |expr| {
104            Expr::Aggregate(AggExpr::new(expr, Rollup::Unique))
105        })
106    }
107
108    pub fn pow(self, exp: impl Into<Expr>) -> Expr {
109        Expr::Binary(BinaryExpr::new(self, exp.into(), BinaryOp::Pow))
110    }
111
112    pub fn lt(self, rhs: impl Into<Expr>) -> Expr {
113        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lt))
114    }
115
116    pub fn lte(self, rhs: impl Into<Expr>) -> Expr {
117        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lte))
118    }
119
120    pub fn gt(self, rhs: impl Into<Expr>) -> Expr {
121        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gt))
122    }
123
124    pub fn gte(self, rhs: impl Into<Expr>) -> Expr {
125        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gte))
126    }
127
128    pub fn eq(self, rhs: impl Into<Expr>) -> Expr {
129        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Eq))
130    }
131
132    pub fn ne(self, rhs: impl Into<Expr>) -> Expr {
133        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Ne))
134    }
135
136    pub fn between(self, low: impl Into<Expr>, high: impl Into<Expr>) -> Expr {
137        let low = low.into();
138        let high = high.into();
139        self.clone().gte(low).and(self.lte(high))
140    }
141
142    pub fn and(self, rhs: impl Into<Expr>) -> Expr {
143        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::And))
144    }
145
146    pub fn or(self, rhs: impl Into<Expr>) -> Expr {
147        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Or))
148    }
149
150    #[allow(clippy::should_implement_trait)]
151    pub fn not(self) -> Expr {
152        Expr::Unary(UnaryExpr::new(self, UnaryOp::Not))
153    }
154
155    #[allow(clippy::should_implement_trait)]
156    pub fn neg(self) -> Expr {
157        Expr::Unary(UnaryExpr::new(self, UnaryOp::Neg))
158    }
159
160    pub fn abs(self) -> Expr {
161        Expr::Unary(UnaryExpr::new(self, UnaryOp::Abs))
162    }
163
164    #[allow(clippy::should_implement_trait)]
165    pub fn add(self, rhs: impl Into<Expr>) -> Expr {
166        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Add))
167    }
168
169    #[allow(clippy::should_implement_trait)]
170    pub fn sub(self, rhs: impl Into<Expr>) -> Expr {
171        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Sub))
172    }
173
174    #[allow(clippy::should_implement_trait)]
175    pub fn mul(self, rhs: impl Into<Expr>) -> Expr {
176        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Mul))
177    }
178
179    #[allow(clippy::should_implement_trait)]
180    pub fn div(self, rhs: impl Into<Expr>) -> Expr {
181        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Div))
182    }
183
184    pub fn clamp(self, min: impl Into<Expr>, max: impl Into<Expr>) -> Expr {
185        Expr::Trinary(TrinaryExpr::new(
186            self,
187            min.into(),
188            max.into(),
189            TrinaryOp::Clamp,
190        ))
191    }
192
193    /// Returns `self` if it evaluates to a finite number, otherwise `rhs`.
194    /// Triggers fallback on Null, NaN, and ±Inf. Short-circuits — `rhs` is only
195    /// evaluated when needed, so it's safe to use as a non-trivial default.
196    pub fn or_else(self, rhs: impl Into<Expr>) -> Expr {
197        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Coalesce))
198    }
199
200    /// Elementwise min: `min(self, rhs)`. NaN on one side returns the other.
201    /// Use as a ceiling: `expr.min_with(2.0)`.
202    pub fn min_with(self, rhs: impl Into<Expr>) -> Expr {
203        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Min))
204    }
205
206    /// Elementwise max: `max(self, rhs)`. NaN on one side returns the other.
207    /// Use as a floor: `expr.max_with(0.05)`.
208    pub fn max_with(self, rhs: impl Into<Expr>) -> Expr {
209        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Max))
210    }
211
212    /// Quantile at `q ∈ [0, 1]` via linear interpolation between adjacent ranks.
213    /// On an empty buffer returns 0.0. Filters non-finite samples before sorting.
214    /// O(n log n) per evaluation — fine for window sizes ≤ ~1000.
215    pub fn quantile(self, q: f32) -> Expr {
216        self.try_swap_agg_rollup_or(Rollup::Quantile(Quantile::new(q)), |expr| {
217            Expr::Aggregate(AggExpr::new(expr, Rollup::Quantile(Quantile::new(q))))
218        })
219    }
220
221    pub fn stagnation(self, epsilon: f32) -> Expr {
222        Expr::Unary(UnaryExpr::new(
223            self,
224            UnaryOp::Stagnation {
225                epsilon,
226                last_value: None,
227                count: 0,
228            },
229        ))
230    }
231
232    pub fn cast(self, to: DataType) -> Expr {
233        Expr::Unary(UnaryExpr::new(self, UnaryOp::Cast(to)))
234    }
235
236    /// Relative error from a target: `(self - target) / target`. Fuses into
237    /// a single Affine node. `target == 0` produces a degenerate expression
238    /// (division by zero shows up as a NaN/Inf at eval time, then propagates
239    /// to the outer Clamp).
240    pub fn error(self, target: f32) -> Expr {
241        // (x - target) / target == x * (1/target) + (-1)
242        fuse_affine(self, 1.0 / target, -1.0)
243    }
244
245    // Rewrites a Selector's kind in-place. Returns true if the rewrite happened.
246    fn try_swap_select_kind(&mut self, to: MetricKind) -> bool {
247        if let Expr::Selector(sel) = self {
248            sel.kind = to;
249            return true;
250        }
251        false
252    }
253
254    // Rewrites a Selector's field in-place. Returns true if the rewrite happened.
255    fn try_swap_select_field(&mut self, to: MetricField) -> bool {
256        if let Expr::Selector(sel) = self {
257            sel.field = to;
258            return true;
259        }
260        false
261    }
262
263    // If this is a Selector, rewrites its field to `to`; otherwise calls `func`.
264    fn try_swap_select_field_or(
265        mut self,
266        to: MetricField,
267        func: impl FnOnce(Self) -> Expr,
268    ) -> Expr {
269        if self.try_swap_select_field(to) {
270            return self;
271        }
272        func(self)
273    }
274
275    // If this is an Aggregate (non-Unique), rewrites its rollup to `to`; otherwise calls `func`.
276    fn try_swap_agg_rollup_or(mut self, to: Rollup, func: impl FnOnce(Self) -> Expr) -> Expr {
277        match self {
278            Expr::Aggregate(mut agg) => {
279                if agg.rollup != Rollup::Unique {
280                    agg.rollup = to;
281                    self = Expr::Aggregate(agg);
282                    return self;
283                }
284                func(Expr::Aggregate(agg))
285            }
286            _ => func(self),
287        }
288    }
289
290    // Fuses select("x").agg() into a single Selector node when possible, avoiding a wrapping
291    // Aggregate. Falls back to `func` for any other shape.
292    fn try_reduce_select_agg_rollup_or(
293        self,
294        field: MetricField,
295        to: Rollup,
296        func: impl FnOnce(Self) -> Expr,
297    ) -> Expr {
298        self.try_swap_select_field_or(field, |outer| outer.try_swap_agg_rollup_or(to, func))
299    }
300}
301
302macro_rules! impl_from_literal {
303    ($($ty:ty => $variant:ident),*) => {
304        $(
305            impl From<$ty> for Expr {
306                fn from(value: $ty) -> Self {
307                    Expr::Literal(value.into())
308                }
309            }
310        )*
311    };
312}
313
314impl_from_literal!(
315    u8 => UInt8,
316    u16 => UInt16,
317    u32 => UInt32,
318    u64 => UInt64,
319    u128 => UInt128,
320
321    i8 => Int8,
322    i16 => Int16,
323    i32 => Int32,
324    i64 => Int64,
325    i128 => Int128,
326
327    f32 => Float32,
328    f64 => Float64,
329
330    bool => Bool,
331    char => Char,
332    String => Str
333);
334
335impl<T> Add<T> for Expr
336where
337    T: Into<Expr>,
338{
339    type Output = Expr;
340    fn add(self, rhs: T) -> Expr {
341        let rhs = rhs.into();
342        Expr::Binary(BinaryExpr::new(self, rhs, BinaryOp::Add))
343    }
344}
345
346impl<T> Sub<T> for Expr
347where
348    T: Into<Expr>,
349{
350    type Output = Expr;
351    fn sub(self, rhs: T) -> Expr {
352        let rhs = rhs.into();
353        Expr::Binary(BinaryExpr::new(self, rhs, BinaryOp::Sub))
354    }
355}
356
357impl<T> Mul<T> for Expr
358where
359    T: Into<Expr>,
360{
361    type Output = Expr;
362    fn mul(self, rhs: T) -> Expr {
363        let rhs = rhs.into();
364        Expr::Binary(BinaryExpr::new(self, rhs, BinaryOp::Mul))
365    }
366}
367
368impl<T> Div<T> for Expr
369where
370    T: Into<Expr>,
371{
372    type Output = Expr;
373    fn div(self, rhs: T) -> Expr {
374        let rhs = rhs.into();
375        Expr::Binary(BinaryExpr::new(self, rhs, BinaryOp::Div))
376    }
377}
378
379impl Neg for Expr {
380    type Output = Expr;
381    fn neg(self) -> Expr {
382        Expr::Unary(UnaryExpr::new(self, UnaryOp::Neg))
383    }
384}
385
386impl Not for Expr {
387    type Output = Expr;
388    fn not(self) -> Expr {
389        Expr::Unary(UnaryExpr::new(self, UnaryOp::Not))
390    }
391}