1mod error;
2mod evaluated;
3mod expr;
4mod function;
5
6use {
7 self::function::BreakCase,
8 super::{context::RowContext, select::select},
9 crate::{
10 ast::{Aggregate, Expr, Function},
11 data::{CustomFunction, Interval, Row, Value},
12 mock::MockStorage,
13 result::{Error, Result},
14 store::GStore,
15 },
16 async_recursion::async_recursion,
17 chrono::prelude::Utc,
18 futures::{
19 future::{ready, try_join_all},
20 stream::{self, StreamExt, TryStreamExt},
21 },
22 im::HashMap,
23 std::{borrow::Cow, ops::ControlFlow, sync::Arc},
24};
25
26pub use {error::EvaluateError, evaluated::Evaluated};
27
28pub async fn evaluate<'a, 'b, 'c, T>(
29 storage: &'a T,
30 context: Option<Arc<RowContext<'b>>>,
31 aggregated: Option<Arc<HashMap<&'c Aggregate, Value>>>,
32 expr: &'a Expr,
33) -> Result<Evaluated<'a>>
34where
35 'b: 'a,
36 'c: 'a,
37 T: GStore,
38{
39 evaluate_inner(Some(storage), context, aggregated, expr).await
40}
41
42pub async fn evaluate_stateless<'a, 'b: 'a>(
43 context: Option<RowContext<'b>>,
44 expr: &'a Expr,
45) -> Result<Evaluated<'a>> {
46 let context = context.map(Arc::new);
47 let storage: Option<&MockStorage> = None;
48
49 evaluate_inner(storage, context, None, expr).await
50}
51
52#[async_recursion]
53async fn evaluate_inner<'a, 'b, 'c, T>(
54 storage: Option<&'a T>,
55 context: Option<Arc<RowContext<'b>>>,
56 aggregated: Option<Arc<HashMap<&'c Aggregate, Value>>>,
57 expr: &'a Expr,
58) -> Result<Evaluated<'a>>
59where
60 'b: 'a,
61 'c: 'a,
62 T: GStore,
63{
64 let eval = |expr| {
65 let context = context.as_ref().map(Arc::clone);
66 let aggregated = aggregated.as_ref().map(Arc::clone);
67
68 evaluate_inner(storage, context, aggregated, expr)
69 };
70
71 match expr {
72 Expr::Literal(literal) => Ok(expr::literal(literal)),
73 Expr::Value(value) => Ok(Evaluated::Value(Cow::Borrowed(value))),
74 Expr::TypedString { data_type, value } => expr::typed_string(data_type, value),
75 Expr::Identifier(ident) => {
76 let context = context.ok_or_else(|| {
77 EvaluateError::ContextRequiredForIdentEvaluation(Box::new(expr.clone()))
78 })?;
79
80 match context.get_value(ident) {
81 Some(value) => Ok(Evaluated::Value(Cow::Owned(value.clone()))),
82 None => Err(EvaluateError::IdentifierNotFound(ident.to_owned()).into()),
83 }
84 }
85 Expr::Nested(expr) => eval(expr).await,
86 Expr::CompoundIdentifier { alias, ident } => {
87 let context = context.ok_or_else(|| {
88 EvaluateError::ContextRequiredForIdentEvaluation(Box::new(expr.clone()))
89 })?;
90
91 match context.get_alias_value(alias, ident) {
92 Some(value) => Ok(Evaluated::Value(Cow::Owned(value.clone()))),
93 None => Err(EvaluateError::CompoundIdentifierNotFound {
94 table_alias: alias.to_owned(),
95 column_name: ident.to_owned(),
96 }
97 .into()),
98 }
99 }
100 Expr::Subquery(query) => {
101 let storage = storage
102 .ok_or_else(|| EvaluateError::UnsupportedStatelessExpr(Box::new(expr.clone())))?;
103
104 let evaluations = select(storage, query, context.as_ref().map(Arc::clone))
105 .await?
106 .map(|row| {
107 let value = match row? {
108 Row::Vec { columns, values } => {
109 if columns.len() > 1 {
110 return Err(EvaluateError::MoreThanOneColumnReturned.into());
111 }
112 values
113 }
114 Row::Map(_) => {
115 return Err(EvaluateError::SchemalessProjectionForSubQuery.into());
116 }
117 }
118 .into_iter()
119 .next();
120
121 Ok::<_, Error>(value)
122 })
123 .take(2)
124 .try_collect::<Vec<_>>()
125 .await?;
126
127 if evaluations.len() > 1 {
128 return Err(EvaluateError::MoreThanOneRowReturned.into());
129 }
130
131 let value = evaluations
132 .into_iter()
133 .next()
134 .flatten()
135 .unwrap_or(Value::Null);
136
137 Ok(Evaluated::Value(Cow::Owned(value)))
138 }
139 Expr::BinaryOp { op, left, right } => {
140 let left = eval(left).await?;
141 let right = eval(right).await?;
142
143 expr::binary_op(op, left, right)
144 }
145 Expr::UnaryOp { op, expr } => {
146 let v = eval(expr).await?;
147
148 expr::unary_op(op, v)
149 }
150 Expr::Aggregate(aggr) => match aggregated
151 .as_ref()
152 .and_then(|aggregated| aggregated.get(aggr.as_ref()))
153 {
154 Some(value) => Ok(Evaluated::Value(Cow::Owned(value.clone()))),
155 None => Err(EvaluateError::UnreachableEmptyAggregateValue(aggr.clone()).into()),
156 },
157 Expr::Function(func) => {
158 let context = context.as_ref().map(Arc::clone);
159 let aggregated = aggregated.as_ref().map(Arc::clone);
160
161 evaluate_function(storage, context, aggregated, func).await
162 }
163 Expr::InList {
164 expr,
165 list,
166 negated,
167 } => {
168 let negated = *negated;
169 let target = eval(expr).await?;
170
171 if target.is_null() {
172 return Ok(target);
173 }
174
175 let matched = try_join_all(list.iter().map(eval))
176 .await?
177 .into_iter()
178 .any(|v| v.evaluate_eq(&target).is_true());
179
180 Ok(Evaluated::Value(Cow::Owned(Value::Bool(matched ^ negated))))
181 }
182 Expr::InSubquery {
183 expr: target_expr,
184 subquery,
185 negated,
186 } => {
187 let storage = storage
188 .ok_or_else(|| EvaluateError::UnsupportedStatelessExpr(Box::new(expr.clone())))?;
189 let target = eval(target_expr).await?;
190
191 select(storage, subquery, context)
192 .await?
193 .map(|row| {
194 let value = match row? {
195 Row::Vec { values, .. } => values,
196 Row::Map(_) => {
197 return Err(EvaluateError::SchemalessProjectionForInSubQuery.into());
198 }
199 }
200 .into_iter()
201 .next()
202 .unwrap_or(Value::Null);
203
204 Ok(Evaluated::Value(Cow::Owned(value)))
205 })
206 .try_filter(|evaluated| ready(evaluated.evaluate_eq(&target).is_true()))
207 .try_next()
208 .await
209 .map(|v| v.is_some() ^ negated)
210 .map(Value::Bool)
211 .map(|v| Evaluated::Value(Cow::Owned(v)))
212 }
213 Expr::Between {
214 expr,
215 negated,
216 low,
217 high,
218 } => {
219 let target = eval(expr).await?;
220 let low = eval(low).await?;
221 let high = eval(high).await?;
222
223 Ok(expr::between(&target, *negated, &low, &high))
224 }
225 Expr::Like {
226 expr,
227 negated,
228 pattern,
229 } => {
230 let target = eval(expr).await?;
231 let pattern = eval(pattern).await?;
232 let evaluated = target.like(pattern, true)?;
233
234 Ok(match negated {
235 true => {
236 let t =
237 evaluated.evaluate_eq(&Evaluated::Value(Cow::Owned(Value::Bool(false))));
238 Evaluated::Value(Cow::Owned(Value::from(t)))
239 }
240 false => evaluated,
241 })
242 }
243 Expr::ILike {
244 expr,
245 negated,
246 pattern,
247 } => {
248 let target = eval(expr).await?;
249 let pattern = eval(pattern).await?;
250 let evaluated = target.like(pattern, false)?;
251
252 Ok(match negated {
253 true => {
254 let t =
255 evaluated.evaluate_eq(&Evaluated::Value(Cow::Owned(Value::Bool(false))));
256 Evaluated::Value(Cow::Owned(Value::from(t)))
257 }
258 false => evaluated,
259 })
260 }
261 Expr::Exists { subquery, negated } => {
262 let storage = storage
263 .ok_or_else(|| EvaluateError::UnsupportedStatelessExpr(Box::new(expr.clone())))?;
264
265 select(storage, subquery, context)
266 .await?
267 .try_next()
268 .await
269 .map(|v| v.is_some() ^ negated)
270 .map(Value::Bool)
271 .map(|v| Evaluated::Value(Cow::Owned(v)))
272 }
273 Expr::IsNull(expr) => {
274 let v = eval(expr).await?.is_null();
275
276 Ok(Evaluated::Value(Cow::Owned(Value::Bool(v))))
277 }
278 Expr::IsNotNull(expr) => {
279 let v = eval(expr).await?.is_null();
280
281 Ok(Evaluated::Value(Cow::Owned(Value::Bool(!v))))
282 }
283 Expr::Case {
284 operand,
285 when_then,
286 else_result,
287 } => {
288 let operand = match operand {
289 Some(op) => eval(op).await?,
290 None => Evaluated::Value(Cow::Owned(Value::Bool(true))),
291 };
292
293 for (when, then) in when_then {
294 let when = eval(when).await?;
295
296 if when.evaluate_eq(&operand).is_true() {
297 return eval(then).await;
298 }
299 }
300
301 match else_result {
302 Some(er) => eval(er).await,
303 None => Ok(Evaluated::Value(Cow::Owned(Value::Null))),
304 }
305 }
306 Expr::ArrayIndex { obj, indexes } => {
307 let obj = eval(obj).await?;
308 let indexes = try_join_all(indexes.iter().map(eval)).await?;
309 expr::array_index(obj, indexes)
310 }
311 Expr::Array { elem } => try_join_all(elem.iter().map(eval))
312 .await?
313 .into_iter()
314 .map(Value::try_from)
315 .collect::<Result<Vec<_>>>()
316 .map(Value::List)
317 .map(|v| Evaluated::Value(Cow::Owned(v))),
318 Expr::Interval {
319 expr,
320 leading_field,
321 last_field,
322 } => {
323 let value = eval(expr)
324 .await
325 .and_then(Value::try_from)
326 .map(String::from)?;
327
328 Interval::try_from_str(&value, *leading_field, *last_field)
329 .map(Value::Interval)
330 .map(|v| Evaluated::Value(Cow::Owned(v)))
331 }
332 }
333}
334
335async fn evaluate_function<'a, 'b: 'a, 'c: 'a, T: GStore>(
336 storage: Option<&'a T>,
337 context: Option<Arc<RowContext<'b>>>,
338 aggregated: Option<Arc<HashMap<&'c Aggregate, Value>>>,
339 func: &'b Function,
340) -> Result<Evaluated<'a>> {
341 use function as f;
342
343 let eval = |expr| {
344 let context = context.as_ref().map(Arc::clone);
345 let aggregated = aggregated.as_ref().map(Arc::clone);
346
347 evaluate_inner(storage, context, aggregated, expr)
348 };
349
350 let name = func.to_string();
351
352 let result = match func {
353 Function::Concat(exprs) => {
355 let exprs = stream::iter(exprs).then(eval).try_collect().await?;
356 f::concat(exprs)
357 }
358 Function::Custom { name, exprs } => {
359 let CustomFunction {
360 func_name,
361 args,
362 body,
363 } = storage
364 .ok_or(EvaluateError::UnsupportedCustomFunction)?
365 .fetch_function(name)
366 .await?
367 .ok_or_else(|| EvaluateError::UnsupportedFunction(name.to_string()))?;
368
369 let min = args.iter().filter(|arg| arg.default.is_none()).count();
370 let max = args.len();
371
372 if !(min..=max).contains(&exprs.len()) {
373 return Err((EvaluateError::FunctionArgsLengthNotWithinRange {
374 name: func_name.to_owned(),
375 expected_minimum: min,
376 expected_maximum: max,
377 found: exprs.len(),
378 })
379 .into());
380 }
381
382 let exprs = exprs.iter().chain(
383 args.iter()
384 .skip(exprs.len())
385 .filter_map(|arg| arg.default.as_ref()),
386 );
387
388 let context = stream::iter(args.iter().zip(exprs))
389 .then(|(arg, expr)| async {
390 eval(expr)
391 .await?
392 .try_into_value(&arg.data_type, true)
393 .map(|value| (arg.name.clone(), value))
394 })
395 .try_collect()
396 .await
397 .map(|values| {
398 let row = Cow::Owned(Row::Map(values));
399 let context = RowContext::new(name, row, None);
400 Some(Arc::new(context))
401 })?;
402
403 return evaluate_inner(storage, context, None, body).await;
404 }
405 Function::ConcatWs { separator, exprs } => {
406 let separator = eval(separator).await?;
407 let exprs = stream::iter(exprs).then(eval).try_collect().await?;
408 f::concat_ws(&name, separator, exprs)
409 }
410 Function::IfNull { expr, then } => f::ifnull(eval(expr).await?, eval(then).await?),
411 Function::NullIf { expr1, expr2 } => f::nullif(eval(expr1).await?, &eval(expr2).await?),
412 Function::Lower(expr) => f::lower(&name, eval(expr).await?),
413 Function::Initcap(expr) => f::initcap(&name, eval(expr).await?),
414 Function::Upper(expr) => f::upper(&name, eval(expr).await?),
415 Function::Left { expr, size } | Function::Right { expr, size } => {
416 let expr = eval(expr).await?;
417 let size = eval(size).await?;
418
419 f::left_or_right(&name, expr, size)
420 }
421 Function::Replace { expr, old, new } => {
422 let expr = eval(expr).await?;
423 let old = eval(old).await?;
424 let new = eval(new).await?;
425
426 f::replace(&name, expr, old, new)
427 }
428 Function::Lpad { expr, size, fill } | Function::Rpad { expr, size, fill } => {
429 let expr = eval(expr).await?;
430 let size = eval(size).await?;
431 let fill = match fill {
432 Some(v) => Some(eval(v).await?),
433 None => None,
434 };
435
436 f::lpad_or_rpad(&name, expr, size, fill)
437 }
438 Function::LastDay(expr) => {
439 let expr = eval(expr).await?;
440 f::last_day(&name, expr)
441 }
442 Function::Trim {
443 expr,
444 filter_chars,
445 trim_where_field,
446 } => {
447 let expr = eval(expr).await?;
448 let filter_chars = match filter_chars {
449 Some(v) => Some(eval(v).await?),
450 None => None,
451 };
452
453 return expr.trim(name, filter_chars, trim_where_field.as_ref());
454 }
455 Function::Ltrim { expr, chars } => {
456 let expr = eval(expr).await?;
457 let chars = match chars {
458 Some(v) => Some(eval(v).await?),
459 None => None,
460 };
461
462 return expr.ltrim(name, chars);
463 }
464 Function::Rtrim { expr, chars } => {
465 let expr = eval(expr).await?;
466 let chars = match chars {
467 Some(v) => Some(eval(v).await?),
468 None => None,
469 };
470
471 return expr.rtrim(name, chars);
472 }
473 Function::Reverse(expr) => {
474 let expr = eval(expr).await?;
475
476 f::reverse(&name, expr)
477 }
478 Function::Repeat { expr, num } => {
479 let expr = eval(expr).await?;
480 let num = eval(num).await?;
481
482 f::repeat(&name, expr, num)
483 }
484 Function::Substr { expr, start, count } => {
485 let expr = eval(expr).await?;
486 let start = eval(start).await?;
487 let count = match count {
488 Some(v) => Some(eval(v).await?),
489 None => None,
490 };
491
492 return expr.substr(name, start, count);
493 }
494 Function::Ascii(expr) => f::ascii(&name, eval(expr).await?),
495 Function::Chr(expr) => f::chr(&name, eval(expr).await?),
496 Function::Md5(expr) => f::md5(&name, eval(expr).await?),
497 Function::Hex(expr) => f::hex(&name, eval(expr).await?),
498
499 Function::Abs(expr) => f::abs(&name, eval(expr).await?),
501 Function::Sign(expr) => f::sign(&name, eval(expr).await?),
502 Function::Sqrt(expr) => f::sqrt(eval(expr).await?),
503 Function::Power { expr, power } => {
504 let expr = eval(expr).await?;
505 let power = eval(power).await?;
506
507 f::power(&name, expr, power)
508 }
509 Function::Ceil(expr) => f::ceil(&name, eval(expr).await?),
510 Function::Rand(expr) => {
511 let expr = match expr {
512 Some(v) => Some(eval(v).await?),
513 None => None,
514 };
515
516 f::rand(&name, expr)
517 }
518 Function::Round(expr) => f::round(&name, eval(expr).await?),
519 Function::Trunc(expr) => f::trunc(&name, eval(expr).await?),
520 Function::Floor(expr) => f::floor(&name, eval(expr).await?),
521 Function::Radians(expr) => f::radians(&name, eval(expr).await?),
522 Function::Degrees(expr) => f::degrees(&name, eval(expr).await?),
523 Function::Pi() => {
524 return Ok(Evaluated::Value(Cow::Owned(Value::F64(
525 std::f64::consts::PI,
526 ))));
527 }
528 Function::Exp(expr) => f::exp(&name, eval(expr).await?),
529 Function::Log { antilog, base } => {
530 let antilog = eval(antilog).await?;
531 let base = eval(base).await?;
532
533 f::log(&name, antilog, base)
534 }
535 Function::Ln(expr) => f::ln(&name, eval(expr).await?),
536 Function::Log2(expr) => f::log2(&name, eval(expr).await?),
537 Function::Log10(expr) => f::log10(&name, eval(expr).await?),
538 Function::Sin(expr) => f::sin(&name, eval(expr).await?),
539 Function::Cos(expr) => f::cos(&name, eval(expr).await?),
540 Function::Tan(expr) => f::tan(&name, eval(expr).await?),
541 Function::Asin(expr) => f::asin(&name, eval(expr).await?),
542 Function::Acos(expr) => f::acos(&name, eval(expr).await?),
543 Function::Atan(expr) => f::atan(&name, eval(expr).await?),
544
545 Function::Div { dividend, divisor } => {
547 let dividend = eval(dividend).await?;
548 let divisor = eval(divisor).await?;
549
550 f::div(&name, dividend, divisor)
551 }
552 Function::Mod { dividend, divisor } => {
553 let dividend = eval(dividend).await?;
554 let divisor = eval(divisor).await?;
555
556 return dividend.modulo(&divisor);
557 }
558 Function::Gcd { left, right } => {
559 let left = eval(left).await?;
560 let right = eval(right).await?;
561
562 f::gcd(&name, left, right)
563 }
564 Function::Lcm { left, right } => {
565 let left = eval(left).await?;
566 let right = eval(right).await?;
567
568 f::lcm(&name, left, right)
569 }
570
571 Function::Point { x, y } => {
573 let x = eval(x).await?;
574 let y = eval(y).await?;
575
576 f::point(&name, x, y)
577 }
578 Function::GetX(expr) => f::get_x(&name, eval(expr).await?),
579 Function::GetY(expr) => f::get_y(&name, eval(expr).await?),
580 Function::CalcDistance {
581 geometry1,
582 geometry2,
583 } => {
584 let geometry1 = eval(geometry1).await?;
585 let geometry2 = eval(geometry2).await?;
586
587 f::calc_distance(&name, geometry1, geometry2)
588 }
589
590 Function::Unwrap { expr, selector } => {
592 let expr = eval(expr).await?;
593 let selector = eval(selector).await?;
594
595 f::unwrap(&name, expr, selector)
596 }
597 Function::GenerateUuid() => return Ok(f::generate_uuid()),
598 Function::Greatest(exprs) => {
599 let exprs = stream::iter(exprs).then(eval).try_collect().await?;
600 return f::greatest(&name, exprs);
601 }
602 Function::Now() | Function::CurrentTimestamp() => {
603 return Ok(Evaluated::Value(Cow::Owned(Value::Timestamp(
604 Utc::now().naive_utc(),
605 ))));
606 }
607 Function::CurrentDate() => {
608 return Ok(Evaluated::Value(Cow::Owned(Value::Date(
609 Utc::now().date_naive(),
610 ))));
611 }
612 Function::CurrentTime() => {
613 return Ok(Evaluated::Value(Cow::Owned(Value::Time(Utc::now().time()))));
614 }
615 Function::Format { expr, format } => {
616 let expr = eval(expr).await?;
617 let format = eval(format).await?;
618
619 f::format(&name, expr, format)
620 }
621 Function::ToDate { expr, format } => {
622 let expr = eval(expr).await?;
623 let format = eval(format).await?;
624 f::to_date(&name, expr, format)
625 }
626 Function::ToTimestamp { expr, format } => {
627 let expr = eval(expr).await?;
628 let format = eval(format).await?;
629 f::to_timestamp(&name, expr, format)
630 }
631 Function::ToTime { expr, format } => {
632 let expr = eval(expr).await?;
633 let format = eval(format).await?;
634 f::to_time(&name, expr, format)
635 }
636 Function::Position {
637 from_expr,
638 sub_expr,
639 } => {
640 let from_expr = eval(from_expr).await?;
641 let sub_expr = eval(sub_expr).await?;
642 f::position(from_expr, sub_expr)
643 }
644 Function::FindIdx {
645 from_expr,
646 sub_expr,
647 start,
648 } => {
649 let from_expr = eval(from_expr).await?;
650 let sub_expr = eval(sub_expr).await?;
651 let start = match start {
652 Some(idx) => Some(eval(idx).await?),
653 None => None,
654 };
655 f::find_idx(&name, from_expr, sub_expr, start)
656 }
657 Function::Cast { expr, data_type } => return eval(expr).await?.cast(data_type),
658 Function::Extract { field, expr } => {
659 let expr = eval(expr).await?;
660 f::extract(*field, expr)
661 }
662 Function::Coalesce(exprs) => {
663 let exprs = stream::iter(exprs).then(eval).try_collect().await?;
664 return f::coalesce(exprs);
665 }
666
667 Function::Append { expr, value } => {
669 let expr = eval(expr).await?;
670 let value = eval(value).await?;
671 f::append(expr, value)
672 }
673 Function::Prepend { expr, value } => {
674 let expr = eval(expr).await?;
675 let value = eval(value).await?;
676 f::prepend(expr, value)
677 }
678 Function::Skip { expr, size } => {
679 let expr = eval(expr).await?;
680 let size = eval(size).await?;
681 f::skip(&name, expr, size)
682 }
683 Function::Sort { expr, order } => {
684 let expr = eval(expr).await?;
685 let order = match order {
686 Some(o) => eval(o).await?,
687 None => Evaluated::Value(Cow::Owned(Value::Str("ASC".to_owned()))),
688 };
689 f::sort(expr, order)
690 }
691 Function::Take { expr, size } => {
692 let expr = eval(expr).await?;
693 let size = eval(size).await?;
694 f::take(&name, expr, size)
695 }
696 Function::Slice {
697 expr,
698 start,
699 length,
700 } => {
701 let expr = eval(expr).await?;
702 let start = eval(start).await?;
703 let length = eval(length).await?;
704 f::slice(&name, expr, start, length)
705 }
706 Function::IsEmpty(expr) => {
707 let expr = eval(expr).await?;
708 f::is_empty(expr)
709 }
710 Function::AddMonth { expr, size } => {
711 let expr = eval(expr).await?;
712 let size = eval(size).await?;
713 f::add_month(&name, expr, size)
714 }
715 Function::Length(expr) => f::length(&name, eval(expr).await?),
716 Function::Entries(expr) => f::entries(&name, eval(expr).await?),
717 Function::Keys(expr) => f::keys(eval(expr).await?),
718 Function::Values(expr) => {
719 let expr = eval(expr).await?;
720 f::values(expr)
721 }
722 Function::Splice {
723 list_data,
724 begin_index,
725 end_index,
726 values,
727 } => {
728 let list_data = eval(list_data).await?;
729 let begin_index = eval(begin_index).await?;
730 let end_index = eval(end_index).await?;
731 let values = match values {
732 Some(v) => Some(eval(v).await?),
733 None => None,
734 };
735 f::splice(&name, list_data, begin_index, end_index, values)
736 }
737 Function::Dedup(list) => f::dedup(eval(list).await?),
738 };
739
740 match result {
741 ControlFlow::Continue(v) => Ok(v),
742 ControlFlow::Break(BreakCase::Null) => Ok(Evaluated::Value(Cow::Owned(Value::Null))),
743 ControlFlow::Break(BreakCase::Err(err)) => Err(err),
744 }
745}