Skip to main content

radiate_expr/expression/
mod.rs

1mod aggregate;
2mod logical;
3mod named;
4mod ops;
5mod projection;
6mod schedule;
7mod select;
8
9use crate::{
10    AnyValue, DataType, Field,
11    expression::schedule::{EveryState, ScheduleExpr},
12};
13
14use aggregate::{AggExpr, BufferExpr, Rollup};
15use logical::When;
16pub use named::NamedExpr;
17use ops::{BinaryExpr, BinaryOp, TrinaryExpr, TrinaryOp, UnaryExpr, UnaryOp};
18pub use projection::*;
19use radiate_error::RadiateError;
20use radiate_utils::{SmallStr, WindowBuffer};
21pub use select::SelectExpr;
22#[cfg(feature = "serde")]
23use serde::{Deserialize, Serialize};
24use std::fmt::Debug;
25
26mod expr_fields {
27    use super::*;
28    use crate::DataType;
29
30    pub static STD_DEV: Field = Field::new_const("std_dev", DataType::Float32);
31    pub static MEAN: Field = Field::new_const("mean", DataType::Float32);
32    pub static MIN: Field = Field::new_const("min", DataType::Float32);
33    pub static MAX: Field = Field::new_const("max", DataType::Float32);
34    pub static SUM: Field = Field::new_const("sum", DataType::Float32);
35    pub static VAR: Field = Field::new_const("var", DataType::Float32);
36    pub static SKEW: Field = Field::new_const("skew", DataType::Float32);
37    pub static COUNT: Field = Field::new_const("count", DataType::UInt64);
38    pub static LAST_VALUE: Field = Field::new_const("last_value", DataType::Float32);
39}
40
41pub(crate) type ExprResult<'a> = Result<AnyValue<'a>, RadiateError>;
42
43pub trait ApplyExpr<'a> {
44    fn apply(&self, expr: &'a mut Expr) -> AnyValue<'a>;
45}
46
47pub trait ExprQuery<I> {
48    fn dispatch<'a>(&'a mut self, input: &I) -> ExprResult<'a>;
49}
50
51#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
52#[derive(Clone, Debug, PartialEq)]
53pub enum Expr {
54    Literal(AnyValue<'static>),
55    Selector(SelectExpr),
56    Aggregate(AggExpr),
57    Buffer(BufferExpr),
58    Schedule(ScheduleExpr),
59    Binary(BinaryExpr),
60    Unary(UnaryExpr),
61    Trinary(TrinaryExpr),
62}
63
64impl Expr {
65    fn try_swap_select_dtype(&mut self, to: DataType) -> bool {
66        match self {
67            Expr::Selector(SelectExpr::Field(value, field)) => {
68                let new_field = field.with_dtype(to);
69                *self = Expr::Selector(SelectExpr::Field(value.clone(), new_field));
70                true
71            }
72            _ => false,
73        }
74    }
75
76    fn try_swap_select_name(&mut self, to: &Field) -> bool {
77        match self {
78            Expr::Selector(SelectExpr::Field(value, field)) => {
79                let new_field = field.with_name(to.name().clone());
80                *self = Expr::Selector(SelectExpr::Field(value.clone(), new_field));
81                true
82            }
83            _ => false,
84        }
85    }
86
87    fn try_swap_select_field_or(mut self, to: &Field, func: impl FnOnce(Self) -> Expr) -> Expr {
88        if self.try_swap_select_name(to) {
89            return self;
90        }
91
92        func(self)
93    }
94
95    fn try_swap_agg_rollup_or(mut self, to: Rollup, func: impl FnOnce(Self) -> Expr) -> Expr {
96        match self {
97            Expr::Aggregate(mut agg) => {
98                if agg.rollup != Rollup::Unique {
99                    agg.rollup = to;
100                    self = Expr::Aggregate(agg);
101                    return self;
102                }
103
104                func(Expr::Aggregate(agg))
105            }
106            _ => func(self),
107        }
108    }
109
110    fn try_reduce_select_agg_rollup_or(
111        self,
112        field: &Field,
113        to: Rollup,
114        func: impl FnOnce(Self) -> Expr,
115    ) -> Expr {
116        self.try_swap_select_field_or(field, |outer| {
117            outer.try_swap_agg_rollup_or(to, |inner| func(inner))
118        })
119    }
120
121    pub fn time(mut self) -> Expr {
122        self.try_swap_select_dtype(DataType::Duration);
123        self
124    }
125
126    pub fn value(mut self) -> Expr {
127        self.try_swap_select_dtype(DataType::Float32);
128        self
129    }
130
131    pub fn debug(self) -> Expr {
132        Expr::Unary(UnaryExpr::new(self, UnaryOp::Debug))
133    }
134
135    pub fn rolling(self, window_size: usize) -> Expr {
136        match self {
137            Expr::Aggregate(agg) => Expr::Aggregate(AggExpr {
138                child: agg.child,
139                rollup: agg.rollup,
140                buffer: Some(WindowBuffer::with_window(window_size)),
141            }),
142            Expr::Selector(select) => Expr::Aggregate(AggExpr {
143                child: Box::new(Expr::Selector(select)),
144                rollup: Rollup::Last,
145                buffer: Some(WindowBuffer::with_window(window_size)),
146            }),
147            _ => Expr::Buffer(BufferExpr::new(self, window_size)),
148        }
149    }
150
151    /// Aggregates
152    pub fn first(self) -> Expr {
153        self.try_reduce_select_agg_rollup_or(&expr_fields::LAST_VALUE, Rollup::First, |expr| {
154            Expr::Aggregate(AggExpr::new(expr, Rollup::First))
155        })
156    }
157
158    pub fn last(self) -> Expr {
159        self.try_reduce_select_agg_rollup_or(&expr_fields::LAST_VALUE, Rollup::Last, |expr| {
160            Expr::Aggregate(AggExpr::new(expr, Rollup::Last))
161        })
162    }
163
164    pub fn sum(self) -> Expr {
165        self.try_reduce_select_agg_rollup_or(&expr_fields::SUM, Rollup::Sum, |expr| {
166            Expr::Aggregate(AggExpr::new(expr, Rollup::Sum))
167        })
168    }
169
170    pub fn mean(self) -> Expr {
171        self.try_reduce_select_agg_rollup_or(&expr_fields::MEAN, Rollup::Mean, |expr| {
172            Expr::Aggregate(AggExpr::new(expr, Rollup::Mean))
173        })
174    }
175
176    pub fn stddev(self) -> Expr {
177        self.try_reduce_select_agg_rollup_or(&expr_fields::STD_DEV, Rollup::StdDev, |expr| {
178            Expr::Aggregate(AggExpr::new(expr, Rollup::StdDev))
179        })
180    }
181
182    pub fn min(self) -> Expr {
183        self.try_reduce_select_agg_rollup_or(&expr_fields::MIN, Rollup::Min, |expr| {
184            Expr::Aggregate(AggExpr::new(expr, Rollup::Min))
185        })
186    }
187
188    pub fn max(self) -> Expr {
189        self.try_reduce_select_agg_rollup_or(&expr_fields::MAX, Rollup::Max, |expr| {
190            Expr::Aggregate(AggExpr::new(expr, Rollup::Max))
191        })
192    }
193
194    pub fn var(self) -> Expr {
195        self.try_reduce_select_agg_rollup_or(&expr_fields::VAR, Rollup::Var, |expr| {
196            Expr::Aggregate(AggExpr::new(expr, Rollup::Var))
197        })
198    }
199
200    pub fn skew(self) -> Expr {
201        self.try_reduce_select_agg_rollup_or(&expr_fields::SKEW, Rollup::Skew, |expr| {
202            Expr::Aggregate(AggExpr::new(expr, Rollup::Skew))
203        })
204    }
205
206    pub fn count(self) -> Expr {
207        self.try_reduce_select_agg_rollup_or(&expr_fields::COUNT, Rollup::Count, |expr| {
208            Expr::Aggregate(AggExpr::new(expr, Rollup::Count))
209        })
210    }
211
212    pub fn slope(self) -> Expr {
213        self.try_swap_agg_rollup_or(Rollup::Slope, |expr| {
214            Expr::Aggregate(AggExpr::new(expr, Rollup::Slope))
215        })
216    }
217
218    pub fn unique(self) -> Expr {
219        self.try_swap_agg_rollup_or(Rollup::Unique, |expr| {
220            Expr::Aggregate(AggExpr::new(expr, Rollup::Unique))
221        })
222    }
223
224    pub fn pow(self, exp: impl Into<Expr>) -> Expr {
225        Expr::Binary(BinaryExpr::new(self, exp.into(), BinaryOp::Pow))
226    }
227
228    /// Comparisons
229    pub fn lt(self, rhs: impl Into<Expr>) -> Expr {
230        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lt))
231    }
232
233    pub fn lte(self, rhs: impl Into<Expr>) -> Expr {
234        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lte))
235    }
236
237    pub fn gt(self, rhs: impl Into<Expr>) -> Expr {
238        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gt))
239    }
240
241    pub fn gte(self, rhs: impl Into<Expr>) -> Expr {
242        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gte))
243    }
244
245    pub fn eq(self, rhs: impl Into<Expr>) -> Expr {
246        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Eq))
247    }
248
249    pub fn ne(self, rhs: impl Into<Expr>) -> Expr {
250        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Ne))
251    }
252
253    pub fn between(self, low: impl Into<Expr>, high: impl Into<Expr>) -> Expr {
254        let low = low.into();
255        let high = high.into();
256
257        self.clone().gte(low).and(self.lte(high))
258    }
259
260    /// Logic
261    pub fn and(self, rhs: impl Into<Expr>) -> Expr {
262        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::And))
263    }
264
265    pub fn or(self, rhs: impl Into<Expr>) -> Expr {
266        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Or))
267    }
268
269    pub fn not(self) -> Expr {
270        Expr::Unary(UnaryExpr::new(self, UnaryOp::Not))
271    }
272
273    /// Arithmetic
274    pub fn neg(self) -> Expr {
275        Expr::Unary(UnaryExpr::new(self, UnaryOp::Neg))
276    }
277
278    pub fn abs(self) -> Expr {
279        Expr::Unary(UnaryExpr::new(self, UnaryOp::Abs))
280    }
281
282    pub fn add(self, rhs: impl Into<Expr>) -> Expr {
283        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Add))
284    }
285
286    pub fn sub(self, rhs: impl Into<Expr>) -> Expr {
287        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Sub))
288    }
289
290    pub fn mul(self, rhs: impl Into<Expr>) -> Expr {
291        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Mul))
292    }
293
294    pub fn div(self, rhs: impl Into<Expr>) -> Expr {
295        Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Div))
296    }
297
298    pub fn clamp(self, min: impl Into<Expr>, max: impl Into<Expr>) -> Expr {
299        Expr::Trinary(TrinaryExpr::new(
300            self,
301            min.into(),
302            max.into(),
303            TrinaryOp::Clamp,
304        ))
305    }
306
307    // scheduling
308    pub fn every(self, interval: usize) -> When {
309        When::new(Expr::Schedule(ScheduleExpr::Every(EveryState::new(
310            interval,
311        ))))
312    }
313
314    pub fn cast(self, to: DataType) -> Expr {
315        Expr::Unary(UnaryExpr::new(self, UnaryOp::Cast(to)))
316    }
317}
318
319impl<I> ExprQuery<I> for Expr
320where
321    I: ExprProjection,
322{
323    fn dispatch<'a>(&'a mut self, input: &I) -> ExprResult<'a> {
324        match self {
325            Expr::Literal(value) => Ok(value.clone()),
326            Expr::Selector(selector) => selector.dispatch(input),
327            Expr::Aggregate(child) => child.dispatch(input),
328            Expr::Buffer(child) => child.dispatch(input),
329            Expr::Trinary(child) => child.dispatch(input),
330            Expr::Binary(child) => child.dispatch(input),
331            Expr::Unary(child) => child.dispatch(input),
332            Expr::Schedule(child) => child.dispatch(input),
333        }
334    }
335}
336
337impl From<f32> for Expr {
338    fn from(value: f32) -> Self {
339        Expr::Literal(AnyValue::Float32(value))
340    }
341}
342
343pub mod expr {
344    use super::*;
345    use crate::expression::{expr_fields::LAST_VALUE, select::PathBuilder};
346
347    pub fn lit(value: impl Into<AnyValue<'static>>) -> Expr {
348        Expr::Literal(value.into())
349    }
350
351    pub fn select(name: impl Into<SmallStr>) -> Expr {
352        let small_name = name.into();
353        Expr::Selector(SelectExpr::Field(
354            AnyValue::StrOwned(small_name.clone().into_string()),
355            LAST_VALUE.clone(),
356        ))
357    }
358
359    pub fn select_with_dtype(name: impl Into<SmallStr>, dtype: DataType) -> Expr {
360        let small_name = name.into();
361        Expr::Selector(SelectExpr::Field(
362            AnyValue::StrOwned(small_name.clone().into_string()),
363            LAST_VALUE.clone().with_dtype(dtype),
364        ))
365    }
366
367    pub fn when(cond: impl Into<Expr>) -> When {
368        When::new(cond.into())
369    }
370
371    pub fn path(name: impl Into<AnyValue<'static>>) -> PathBuilder {
372        PathBuilder::default().key(name.into())
373    }
374
375    pub fn nth(n: usize) -> Expr {
376        Expr::Selector(SelectExpr::Nth(n))
377    }
378
379    pub fn every(interval: usize) -> When {
380        When::new(Expr::Schedule(ScheduleExpr::Every(EveryState::new(
381            interval,
382        ))))
383    }
384
385    pub fn element() -> Expr {
386        Expr::Selector(SelectExpr::Element)
387    }
388}