1mod aggregate;
2mod logical;
3mod named;
4mod ops;
5mod projection;
6mod schedule;
7mod select;
8
9use crate::{
10 AnyValue, DataType, Field,
11 expression::schedule::{EveryState, ScheduleExpr},
12};
13
14use aggregate::{AggExpr, BufferExpr, Rollup};
15use logical::When;
16pub use named::NamedExpr;
17use ops::{BinaryExpr, BinaryOp, TrinaryExpr, TrinaryOp, UnaryExpr, UnaryOp};
18pub use projection::*;
19use radiate_error::RadiateError;
20use radiate_utils::{SmallStr, WindowBuffer};
21pub use select::SelectExpr;
22#[cfg(feature = "serde")]
23use serde::{Deserialize, Serialize};
24use std::fmt::Debug;
25
26mod expr_fields {
27 use super::*;
28 use crate::DataType;
29
30 pub static STD_DEV: Field = Field::new_const("std_dev", DataType::Float32);
31 pub static MEAN: Field = Field::new_const("mean", DataType::Float32);
32 pub static MIN: Field = Field::new_const("min", DataType::Float32);
33 pub static MAX: Field = Field::new_const("max", DataType::Float32);
34 pub static SUM: Field = Field::new_const("sum", DataType::Float32);
35 pub static VAR: Field = Field::new_const("var", DataType::Float32);
36 pub static SKEW: Field = Field::new_const("skew", DataType::Float32);
37 pub static COUNT: Field = Field::new_const("count", DataType::UInt64);
38 pub static LAST_VALUE: Field = Field::new_const("last_value", DataType::Float32);
39}
40
41pub(crate) type ExprResult<'a> = Result<AnyValue<'a>, RadiateError>;
42
43pub trait ApplyExpr<'a> {
44 fn apply(&self, expr: &'a mut Expr) -> AnyValue<'a>;
45}
46
47pub trait ExprQuery<I> {
48 fn dispatch<'a>(&'a mut self, input: &I) -> ExprResult<'a>;
49}
50
51#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
52#[derive(Clone, Debug, PartialEq)]
53pub enum Expr {
54 Literal(AnyValue<'static>),
55 Selector(SelectExpr),
56 Aggregate(AggExpr),
57 Buffer(BufferExpr),
58 Schedule(ScheduleExpr),
59 Binary(BinaryExpr),
60 Unary(UnaryExpr),
61 Trinary(TrinaryExpr),
62}
63
64impl Expr {
65 fn try_swap_select_dtype(&mut self, to: DataType) -> bool {
66 match self {
67 Expr::Selector(SelectExpr::Field(value, field)) => {
68 let new_field = field.with_dtype(to);
69 *self = Expr::Selector(SelectExpr::Field(value.clone(), new_field));
70 true
71 }
72 _ => false,
73 }
74 }
75
76 fn try_swap_select_name(&mut self, to: &Field) -> bool {
77 match self {
78 Expr::Selector(SelectExpr::Field(value, field)) => {
79 let new_field = field.with_name(to.name().clone());
80 *self = Expr::Selector(SelectExpr::Field(value.clone(), new_field));
81 true
82 }
83 _ => false,
84 }
85 }
86
87 fn try_swap_select_field_or(mut self, to: &Field, func: impl FnOnce(Self) -> Expr) -> Expr {
88 if self.try_swap_select_name(to) {
89 return self;
90 }
91
92 func(self)
93 }
94
95 fn try_swap_agg_rollup_or(mut self, to: Rollup, func: impl FnOnce(Self) -> Expr) -> Expr {
96 match self {
97 Expr::Aggregate(mut agg) => {
98 if agg.rollup != Rollup::Unique {
99 agg.rollup = to;
100 self = Expr::Aggregate(agg);
101 return self;
102 }
103
104 func(Expr::Aggregate(agg))
105 }
106 _ => func(self),
107 }
108 }
109
110 fn try_reduce_select_agg_rollup_or(
111 self,
112 field: &Field,
113 to: Rollup,
114 func: impl FnOnce(Self) -> Expr,
115 ) -> Expr {
116 self.try_swap_select_field_or(field, |outer| {
117 outer.try_swap_agg_rollup_or(to, |inner| func(inner))
118 })
119 }
120
121 pub fn time(mut self) -> Expr {
122 self.try_swap_select_dtype(DataType::Duration);
123 self
124 }
125
126 pub fn value(mut self) -> Expr {
127 self.try_swap_select_dtype(DataType::Float32);
128 self
129 }
130
131 pub fn debug(self) -> Expr {
132 Expr::Unary(UnaryExpr::new(self, UnaryOp::Debug))
133 }
134
135 pub fn rolling(self, window_size: usize) -> Expr {
136 match self {
137 Expr::Aggregate(agg) => Expr::Aggregate(AggExpr {
138 child: agg.child,
139 rollup: agg.rollup,
140 buffer: Some(WindowBuffer::with_window(window_size)),
141 }),
142 Expr::Selector(select) => Expr::Aggregate(AggExpr {
143 child: Box::new(Expr::Selector(select)),
144 rollup: Rollup::Last,
145 buffer: Some(WindowBuffer::with_window(window_size)),
146 }),
147 _ => Expr::Buffer(BufferExpr::new(self, window_size)),
148 }
149 }
150
151 pub fn first(self) -> Expr {
153 self.try_reduce_select_agg_rollup_or(&expr_fields::LAST_VALUE, Rollup::First, |expr| {
154 Expr::Aggregate(AggExpr::new(expr, Rollup::First))
155 })
156 }
157
158 pub fn last(self) -> Expr {
159 self.try_reduce_select_agg_rollup_or(&expr_fields::LAST_VALUE, Rollup::Last, |expr| {
160 Expr::Aggregate(AggExpr::new(expr, Rollup::Last))
161 })
162 }
163
164 pub fn sum(self) -> Expr {
165 self.try_reduce_select_agg_rollup_or(&expr_fields::SUM, Rollup::Sum, |expr| {
166 Expr::Aggregate(AggExpr::new(expr, Rollup::Sum))
167 })
168 }
169
170 pub fn mean(self) -> Expr {
171 self.try_reduce_select_agg_rollup_or(&expr_fields::MEAN, Rollup::Mean, |expr| {
172 Expr::Aggregate(AggExpr::new(expr, Rollup::Mean))
173 })
174 }
175
176 pub fn stddev(self) -> Expr {
177 self.try_reduce_select_agg_rollup_or(&expr_fields::STD_DEV, Rollup::StdDev, |expr| {
178 Expr::Aggregate(AggExpr::new(expr, Rollup::StdDev))
179 })
180 }
181
182 pub fn min(self) -> Expr {
183 self.try_reduce_select_agg_rollup_or(&expr_fields::MIN, Rollup::Min, |expr| {
184 Expr::Aggregate(AggExpr::new(expr, Rollup::Min))
185 })
186 }
187
188 pub fn max(self) -> Expr {
189 self.try_reduce_select_agg_rollup_or(&expr_fields::MAX, Rollup::Max, |expr| {
190 Expr::Aggregate(AggExpr::new(expr, Rollup::Max))
191 })
192 }
193
194 pub fn var(self) -> Expr {
195 self.try_reduce_select_agg_rollup_or(&expr_fields::VAR, Rollup::Var, |expr| {
196 Expr::Aggregate(AggExpr::new(expr, Rollup::Var))
197 })
198 }
199
200 pub fn skew(self) -> Expr {
201 self.try_reduce_select_agg_rollup_or(&expr_fields::SKEW, Rollup::Skew, |expr| {
202 Expr::Aggregate(AggExpr::new(expr, Rollup::Skew))
203 })
204 }
205
206 pub fn count(self) -> Expr {
207 self.try_reduce_select_agg_rollup_or(&expr_fields::COUNT, Rollup::Count, |expr| {
208 Expr::Aggregate(AggExpr::new(expr, Rollup::Count))
209 })
210 }
211
212 pub fn slope(self) -> Expr {
213 self.try_swap_agg_rollup_or(Rollup::Slope, |expr| {
214 Expr::Aggregate(AggExpr::new(expr, Rollup::Slope))
215 })
216 }
217
218 pub fn unique(self) -> Expr {
219 self.try_swap_agg_rollup_or(Rollup::Unique, |expr| {
220 Expr::Aggregate(AggExpr::new(expr, Rollup::Unique))
221 })
222 }
223
224 pub fn pow(self, exp: impl Into<Expr>) -> Expr {
225 Expr::Binary(BinaryExpr::new(self, exp.into(), BinaryOp::Pow))
226 }
227
228 pub fn lt(self, rhs: impl Into<Expr>) -> Expr {
230 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lt))
231 }
232
233 pub fn lte(self, rhs: impl Into<Expr>) -> Expr {
234 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Lte))
235 }
236
237 pub fn gt(self, rhs: impl Into<Expr>) -> Expr {
238 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gt))
239 }
240
241 pub fn gte(self, rhs: impl Into<Expr>) -> Expr {
242 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Gte))
243 }
244
245 pub fn eq(self, rhs: impl Into<Expr>) -> Expr {
246 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Eq))
247 }
248
249 pub fn ne(self, rhs: impl Into<Expr>) -> Expr {
250 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Ne))
251 }
252
253 pub fn between(self, low: impl Into<Expr>, high: impl Into<Expr>) -> Expr {
254 let low = low.into();
255 let high = high.into();
256
257 self.clone().gte(low).and(self.lte(high))
258 }
259
260 pub fn and(self, rhs: impl Into<Expr>) -> Expr {
262 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::And))
263 }
264
265 pub fn or(self, rhs: impl Into<Expr>) -> Expr {
266 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Or))
267 }
268
269 pub fn not(self) -> Expr {
270 Expr::Unary(UnaryExpr::new(self, UnaryOp::Not))
271 }
272
273 pub fn neg(self) -> Expr {
275 Expr::Unary(UnaryExpr::new(self, UnaryOp::Neg))
276 }
277
278 pub fn abs(self) -> Expr {
279 Expr::Unary(UnaryExpr::new(self, UnaryOp::Abs))
280 }
281
282 pub fn add(self, rhs: impl Into<Expr>) -> Expr {
283 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Add))
284 }
285
286 pub fn sub(self, rhs: impl Into<Expr>) -> Expr {
287 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Sub))
288 }
289
290 pub fn mul(self, rhs: impl Into<Expr>) -> Expr {
291 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Mul))
292 }
293
294 pub fn div(self, rhs: impl Into<Expr>) -> Expr {
295 Expr::Binary(BinaryExpr::new(self, rhs.into(), BinaryOp::Div))
296 }
297
298 pub fn clamp(self, min: impl Into<Expr>, max: impl Into<Expr>) -> Expr {
299 Expr::Trinary(TrinaryExpr::new(
300 self,
301 min.into(),
302 max.into(),
303 TrinaryOp::Clamp,
304 ))
305 }
306
307 pub fn every(self, interval: usize) -> When {
309 When::new(Expr::Schedule(ScheduleExpr::Every(EveryState::new(
310 interval,
311 ))))
312 }
313
314 pub fn cast(self, to: DataType) -> Expr {
315 Expr::Unary(UnaryExpr::new(self, UnaryOp::Cast(to)))
316 }
317}
318
319impl<I> ExprQuery<I> for Expr
320where
321 I: ExprProjection,
322{
323 fn dispatch<'a>(&'a mut self, input: &I) -> ExprResult<'a> {
324 match self {
325 Expr::Literal(value) => Ok(value.clone()),
326 Expr::Selector(selector) => selector.dispatch(input),
327 Expr::Aggregate(child) => child.dispatch(input),
328 Expr::Buffer(child) => child.dispatch(input),
329 Expr::Trinary(child) => child.dispatch(input),
330 Expr::Binary(child) => child.dispatch(input),
331 Expr::Unary(child) => child.dispatch(input),
332 Expr::Schedule(child) => child.dispatch(input),
333 }
334 }
335}
336
337impl From<f32> for Expr {
338 fn from(value: f32) -> Self {
339 Expr::Literal(AnyValue::Float32(value))
340 }
341}
342
343pub mod expr {
344 use super::*;
345 use crate::expression::{expr_fields::LAST_VALUE, select::PathBuilder};
346
347 pub fn lit(value: impl Into<AnyValue<'static>>) -> Expr {
348 Expr::Literal(value.into())
349 }
350
351 pub fn select(name: impl Into<SmallStr>) -> Expr {
352 let small_name = name.into();
353 Expr::Selector(SelectExpr::Field(
354 AnyValue::StrOwned(small_name.clone().into_string()),
355 LAST_VALUE.clone(),
356 ))
357 }
358
359 pub fn select_with_dtype(name: impl Into<SmallStr>, dtype: DataType) -> Expr {
360 let small_name = name.into();
361 Expr::Selector(SelectExpr::Field(
362 AnyValue::StrOwned(small_name.clone().into_string()),
363 LAST_VALUE.clone().with_dtype(dtype),
364 ))
365 }
366
367 pub fn when(cond: impl Into<Expr>) -> When {
368 When::new(cond.into())
369 }
370
371 pub fn path(name: impl Into<AnyValue<'static>>) -> PathBuilder {
372 PathBuilder::default().key(name.into())
373 }
374
375 pub fn nth(n: usize) -> Expr {
376 Expr::Selector(SelectExpr::Nth(n))
377 }
378
379 pub fn every(interval: usize) -> When {
380 When::new(Expr::Schedule(ScheduleExpr::Every(EveryState::new(
381 interval,
382 ))))
383 }
384
385 pub fn element() -> Expr {
386 Expr::Selector(SelectExpr::Element)
387 }
388}