1#![doc = include_str!("expression.md")]
4
5mod boolean_expression;
6mod function_call;
7mod operator;
8
9pub use boolean_expression::{BinaryExpr, ComparisonFunction, LogicalFunction, NumericalFunction};
10pub use function_call::FunctionCall;
11pub use operator::{BinaryOperator, UnaryOperator};
12
13use anyhow::anyhow;
14
15use crate::{
16 api::error::{Result, SpringError},
17 pipeline::{AggregateFunctionParameter, ColumnReference},
18 stream_engine::{
19 time::{SpringDuration, SpringEventDuration},
20 NnSqlValue, SqlCompareResult, SqlValue, Tuple,
21 },
22};
23
24pub trait ValueExprType {}
25
26#[derive(Clone, PartialEq, Hash, Debug)]
31pub enum ValueExpr {
32 Constant(SqlValue),
33 UnaryOperator(UnaryOperator, Box<Self>),
34 BinaryExpr(BinaryExpr<Self>),
35 FunctionCall(FunctionCall<Self>),
36
37 ColumnReference(ColumnReference),
38}
39impl ValueExprType for ValueExpr {}
40
41impl ValueExpr {
42 pub fn resolve_colref(self, tuple: &Tuple) -> Result<ValueExprPh2> {
43 match self {
44 Self::Constant(value) => Ok(ValueExprPh2::Constant(value)),
45
46 Self::ColumnReference(colref) => {
47 let value = tuple.get_value(&colref)?;
48 Ok(ValueExprPh2::Constant(value))
49 }
50
51 Self::FunctionCall(function_call) => match function_call {
52 FunctionCall::DurationMillis { duration_millis } => {
53 let duration_millis_ph2 = duration_millis.resolve_colref(tuple)?;
54 Ok(ValueExprPh2::FunctionCall(FunctionCall::DurationMillis {
55 duration_millis: Box::new(duration_millis_ph2),
56 }))
57 }
58 FunctionCall::DurationSecs { duration_secs } => {
59 let duration_secs_ph2 = duration_secs.resolve_colref(tuple)?;
60 Ok(ValueExprPh2::FunctionCall(FunctionCall::DurationSecs {
61 duration_secs: Box::new(duration_secs_ph2),
62 }))
63 }
64 FunctionCall::FloorTime { target, resolution } => {
65 let target_ph2 = target.resolve_colref(tuple)?;
66 let resolution_ph2 = resolution.resolve_colref(tuple)?;
67 Ok(ValueExprPh2::FunctionCall(FunctionCall::FloorTime {
68 target: Box::new(target_ph2),
69 resolution: Box::new(resolution_ph2),
70 }))
71 }
72 },
73 Self::UnaryOperator(op, expr_ph1) => {
74 let expr_ph2 = expr_ph1.resolve_colref(tuple)?;
75 Ok(ValueExprPh2::UnaryOperator(op, Box::new(expr_ph2)))
76 }
77 Self::BinaryExpr(bool_expr) => match bool_expr {
78 BinaryExpr::LogicalFunctionVariant(logical_function) => match logical_function {
79 LogicalFunction::AndVariant { left, right } => {
80 let left_ph2 = left.resolve_colref(tuple)?;
81 let right_ph2 = right.resolve_colref(tuple)?;
82 Ok(ValueExprPh2::BinaryExpr(
83 BinaryExpr::LogicalFunctionVariant(LogicalFunction::AndVariant {
84 left: Box::new(left_ph2),
85 right: Box::new(right_ph2),
86 }),
87 ))
88 }
89 },
90 BinaryExpr::ComparisonFunctionVariant(comparison_function) => {
91 match comparison_function {
92 ComparisonFunction::EqualVariant { left, right } => {
93 let left_ph2 = left.resolve_colref(tuple)?;
94 let right_ph2 = right.resolve_colref(tuple)?;
95 Ok(ValueExprPh2::BinaryExpr(
96 BinaryExpr::ComparisonFunctionVariant(
97 ComparisonFunction::EqualVariant {
98 left: Box::new(left_ph2),
99 right: Box::new(right_ph2),
100 },
101 ),
102 ))
103 }
104 }
105 }
106 BinaryExpr::NumericalFunctionVariant(numerical_function) => {
107 match numerical_function {
108 NumericalFunction::AddVariant { left, right } => {
109 let left_ph2 = left.resolve_colref(tuple)?;
110 let right_ph2 = right.resolve_colref(tuple)?;
111 Ok(ValueExprPh2::BinaryExpr(
112 BinaryExpr::NumericalFunctionVariant(
113 NumericalFunction::AddVariant {
114 left: Box::new(left_ph2),
115 right: Box::new(right_ph2),
116 },
117 ),
118 ))
119 }
120 NumericalFunction::MulVariant { left, right } => {
121 let left_ph2 = left.resolve_colref(tuple)?;
122 let right_ph2 = right.resolve_colref(tuple)?;
123 Ok(ValueExprPh2::BinaryExpr(
124 BinaryExpr::NumericalFunctionVariant(
125 NumericalFunction::MulVariant {
126 left: Box::new(left_ph2),
127 right: Box::new(right_ph2),
128 },
129 ),
130 ))
131 }
132 }
133 }
134 },
135 }
136 }
137}
138
139#[derive(Clone, PartialEq, Hash, Debug)]
143pub enum ValueExprPh2 {
144 Constant(SqlValue),
145 UnaryOperator(UnaryOperator, Box<Self>),
146 BinaryExpr(BinaryExpr<Self>),
147 FunctionCall(FunctionCall<Self>),
148}
149impl ValueExprType for ValueExprPh2 {}
150
151impl ValueExprPh2 {
152 pub fn eval(self) -> Result<SqlValue> {
153 match self {
154 Self::Constant(sql_value) => Ok(sql_value),
155 Self::UnaryOperator(uni_op, child) => {
156 let child_sql_value = child.eval()?;
157 match (uni_op, child_sql_value) {
158 (UnaryOperator::Minus, SqlValue::Null) => Ok(SqlValue::Null),
159 (UnaryOperator::Minus, SqlValue::NotNull(nn_sql_value)) => {
160 Ok(SqlValue::NotNull(nn_sql_value.negate()?))
161 }
162 }
163 }
164 Self::BinaryExpr(bool_expr) => match bool_expr {
165 BinaryExpr::ComparisonFunctionVariant(comparison_function) => {
166 match comparison_function {
167 ComparisonFunction::EqualVariant { left, right } => {
168 let left_sql_value = left.eval()?;
169 let right_sql_value = right.eval()?;
170 left_sql_value
171 .sql_compare(&right_sql_value)
172 .map(|sql_compare_result| {
173 SqlValue::NotNull(NnSqlValue::Boolean(matches!(
174 sql_compare_result,
175 SqlCompareResult::Eq
176 )))
177 })
178 }
179 }
180 }
181 BinaryExpr::LogicalFunctionVariant(logical_function) => match logical_function {
182 LogicalFunction::AndVariant { left, right } => {
183 let left_sql_value = left.eval()?;
184 let right_sql_value = right.eval()?;
185
186 let b = left_sql_value.to_bool()? && right_sql_value.to_bool()?;
187 Ok(SqlValue::NotNull(NnSqlValue::Boolean(b)))
188 }
189 },
190 BinaryExpr::NumericalFunctionVariant(numerical_function) => {
191 Self::eval_numerical_function(numerical_function)
192 }
193 },
194 Self::FunctionCall(function_call) => Self::eval_function_call(function_call),
195 }
196 }
197 fn eval_numerical_function(numerical_function: NumericalFunction<Self>) -> Result<SqlValue> {
198 match numerical_function {
199 NumericalFunction::AddVariant { left, right } => {
200 let left_sql_value = left.eval()?;
201 let right_sql_value = right.eval()?;
202 left_sql_value + right_sql_value
203 }
204 NumericalFunction::MulVariant { left, right } => {
205 let left_sql_value = left.eval()?;
206 let right_sql_value = right.eval()?;
207 left_sql_value * right_sql_value
208 }
209 }
210 }
211
212 fn eval_function_call(function_call: FunctionCall<Self>) -> Result<SqlValue> {
213 match function_call {
214 FunctionCall::FloorTime { target, resolution } => {
215 Self::eval_function_floor_time(*target, *resolution)
216 }
217 FunctionCall::DurationMillis { duration_millis } => {
218 Self::eval_function_duration_millis(*duration_millis)
219 }
220 FunctionCall::DurationSecs { duration_secs } => {
221 Self::eval_function_duration_secs(*duration_secs)
222 }
223 }
224 }
225
226 fn eval_function_floor_time(target: Self, resolution: Self) -> Result<SqlValue> {
227 let target_value = target.eval()?;
228 let resolution_value = resolution.eval()?;
229
230 match (&target_value, &resolution_value) {
231 (
232 SqlValue::NotNull(NnSqlValue::Timestamp(ts)),
233 SqlValue::NotNull(NnSqlValue::Duration(resolution)),
234 ) => {
235 let ts_floor = ts.floor(resolution.to_duration())?;
236 Ok(SqlValue::NotNull(NnSqlValue::Timestamp(ts_floor)))
237 }
238 _ => Err(SpringError::Sql(anyhow!(
239 "invalid parameter to FLOOR_TIME: `({}, {})`",
240 target_value,
241 resolution_value
242 ))),
243 }
244 }
245
246 fn eval_function_duration_millis(duration_millis: Self) -> Result<SqlValue> {
247 let duration_value = duration_millis.eval()?;
248 let duration_millis = duration_value.to_i64()?;
249 if duration_millis >= 0 {
250 let duration = SpringEventDuration::from_millis(duration_millis as u64);
251 Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
252 } else {
253 Err(SpringError::Sql(anyhow!(
254 "DURATION_MILLIS should take positive integer but got `{}`",
255 duration_millis
256 )))
257 }
258 }
259 fn eval_function_duration_secs(duration_secs: Self) -> Result<SqlValue> {
260 let duration_value = duration_secs.eval()?;
261 let duration_secs = duration_value.to_i64()?;
262 if duration_secs >= 0 {
263 let duration = SpringEventDuration::from_secs(duration_secs as u64);
264 Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
265 } else {
266 Err(SpringError::Sql(anyhow!(
267 "DURATION_SECS should take positive integer but got `{}`",
268 duration_secs
269 )))
270 }
271 }
272}
273
274#[derive(Clone, PartialEq, Debug)]
276pub struct AggrExpr {
277 pub func: AggregateFunctionParameter,
278 pub aggregated: ValueExpr,
279}