1use crate::error::EvaluationError;
2use crate::eval::expr::EvalExpr;
3use crate::eval::{EvalContext, EvalPlan, NestedContext};
4use itertools::Itertools;
5use partiql_value::Value::{Boolean, Missing, Null};
6use partiql_value::{
7 bag, list, tuple, Bag, List, NullSortedValue, Tuple, Value, ValueIntoIterator,
8};
9use rustc_hash::FxHashMap;
10use std::borrow::Borrow;
11use std::borrow::Cow;
12use std::cell::RefCell;
13use std::cmp::Ordering;
14use std::collections::hash_map::Entry;
15use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17
18use crate::env::basic::MapBindings;
19use partiql_value::datum::{Datum, DatumLower, DatumLowerResult, DatumTupleRef, RefTupleView};
20use std::rc::Rc;
21
22#[macro_export]
23macro_rules! take_input {
24 ($expr:expr, $ctx:expr) => {
25 match $expr {
26 None => {
27 $ctx.add_error($crate::error::EvaluationError::IllegalState(
28 "Error in retrieving input value".to_string(),
29 ));
30 return partiql_value::Value::Missing;
31 }
32 Some(val) => val,
33 }
34 };
35}
36
37#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
39pub enum EvalType {
40 SelfManaged,
41 GraphManaged,
42}
43
44pub trait Evaluable: Debug {
46 fn evaluate(&self, inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value;
47 fn get_vars(&self) -> Option<&[String]> {
48 None
49 }
50 fn eval_type(&self) -> EvalType {
51 EvalType::GraphManaged
52 }
53}
54
55pub(crate) struct EvalScan {
59 pub(crate) expr: Box<dyn EvalExpr>,
60 pub(crate) as_key: String,
61 pub(crate) at_key: Option<String>,
62
63 attrs: Vec<String>,
65}
66
67impl Debug for EvalScan {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 write!(f, "SCAN ")?;
70 self.expr.fmt(f)?;
71
72 write!(f, " AS {}", self.as_key)?;
73
74 if let Some(at_key) = &self.at_key {
75 write!(f, " AT {at_key}")?;
76 }
77
78 Ok(())
79 }
80}
81
82impl EvalScan {
83 pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str) -> Self {
84 let attrs = vec![as_key.to_string()];
85 EvalScan {
86 expr,
87 as_key: as_key.to_string(),
88 at_key: None,
89
90 attrs,
91 }
92 }
93 pub(crate) fn new_with_at_key(expr: Box<dyn EvalExpr>, as_key: &str, at_key: &str) -> Self {
94 let attrs = vec![as_key.to_string(), at_key.to_string()];
95 EvalScan {
96 expr,
97 as_key: as_key.to_string(),
98 at_key: Some(at_key.to_string()),
99
100 attrs,
101 }
102 }
103}
104
105impl Evaluable for EvalScan {
106 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
107 let input_value = inputs[0].take().unwrap_or(Missing);
108
109 let bindings = match input_value {
110 Value::Bag(t) => *t,
111 Value::Tuple(t) => bag![*t],
112 _ => bag![tuple![]],
113 };
114 let mut value = bag![];
115 bindings.iter().for_each(|binding| {
116 let binding_tuple = binding.as_datum_tuple_ref();
117 let v = self.expr.evaluate(&binding_tuple, ctx).into_owned();
118 let mut at_index_counter: i64 = 0;
119 if let Some(at_key) = &self.at_key {
120 let ordered = v.is_ordered();
121 for t in v {
122 let mut out = Tuple::from([(self.as_key.as_str(), t)]);
123 let at_id = if ordered {
124 at_index_counter.into()
125 } else {
126 Missing
127 };
128 out.insert(at_key, at_id);
129 value.push(Value::Tuple(Box::new(out)));
130 at_index_counter += 1;
131 }
132 } else {
133 for t in v {
134 let out = Tuple::from([(self.as_key.as_str(), t)]);
135 value.push(Value::Tuple(Box::new(out)));
136 }
137 }
138 });
139
140 Value::Bag(Box::new(value))
141 }
142
143 fn get_vars(&self) -> Option<&[String]> {
144 Some(&self.attrs)
145 }
146}
147
148pub(crate) struct EvalJoin {
152 pub(crate) kind: EvalJoinKind,
153 pub(crate) on: Option<Box<dyn EvalExpr>>,
154
155 pub(crate) left: Box<dyn Evaluable>,
156 pub(crate) right: Box<dyn Evaluable>,
157}
158
159#[derive(Debug)]
160pub(crate) enum EvalJoinKind {
161 Inner,
162 Left,
163 Right,
164 Full,
165}
166
167impl Debug for EvalJoin {
168 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
169 write!(f, "{:#?} JOIN", &self.kind)?;
170 if let Some(on) = &self.on {
171 write!(f, " ON ")?;
172 on.fmt(f)?;
173 }
174 Ok(())
175 }
176}
177
178impl EvalJoin {
179 pub(crate) fn new(
180 kind: EvalJoinKind,
181 left: Box<dyn Evaluable>,
182 right: Box<dyn Evaluable>,
183 on: Option<Box<dyn EvalExpr>>,
184 ) -> Self {
185 EvalJoin {
186 kind,
187 on,
188
189 left,
190 right,
191 }
192 }
193}
194
195impl Evaluable for EvalJoin {
196 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
197 #[inline]
199 fn tuple_with_null_vals<I, S>(attrs: I) -> Tuple
200 where
201 S: Into<String>,
202 I: IntoIterator<Item = S>,
203 {
204 attrs.into_iter().map(|k| (k.into(), Null)).collect()
205 }
206
207 let mut output_bag = bag![];
208 let input_env = inputs[0].take().unwrap_or_else(|| Value::from(tuple![]));
209 let lhs_values = self.left.evaluate([Some(input_env.clone()), None], ctx);
210 let left_bindings = match lhs_values {
211 Value::Bag(t) => *t,
212 _ => {
213 ctx.add_error(EvaluationError::IllegalState(
214 "Left side of FROM source should result in a bag of bindings".to_string(),
215 ));
216 return Missing;
217 }
218 };
219
220 match self.kind {
223 EvalJoinKind::Inner => {
224 left_bindings.iter().for_each(|b_l| {
226 let env_b_l = input_env
227 .as_tuple_ref()
228 .as_ref()
229 .tuple_concat(b_l.as_tuple_ref().borrow());
230 let rhs_values = self.right.evaluate([Some(Value::from(env_b_l)), None], ctx);
231
232 let right_bindings = match rhs_values {
233 Value::Bag(t) => *t,
234 _ => bag![tuple![]],
235 };
236
237 for b_r in &right_bindings {
239 match &self.on {
240 None => {
241 let b_l_b_r = b_l
242 .as_tuple_ref()
243 .as_ref()
244 .tuple_concat(b_r.as_tuple_ref().borrow());
245 output_bag.push(Value::from(b_l_b_r));
246 }
247 Some(condition) => {
249 let b_l_b_r = b_l
250 .as_tuple_ref()
251 .as_ref()
252 .tuple_concat(b_r.as_tuple_ref().borrow());
253 let env_b_l_b_r =
254 &input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
255 let tuple_ref = DatumTupleRef::Tuple(env_b_l_b_r);
256 let cond = condition.evaluate(&tuple_ref, ctx);
257 if let Value::Boolean(true) = cond.as_ref() {
258 output_bag.push(Value::Tuple(Box::new(b_l_b_r)));
259 }
260 }
261 }
262 }
263 });
264 }
265 EvalJoinKind::Left => {
266 left_bindings.iter().for_each(|b_l| {
268 let mut output_bag_left = bag![];
270 let env_b_l = input_env
271 .as_tuple_ref()
272 .as_ref()
273 .tuple_concat(b_l.as_tuple_ref().borrow());
274 let rhs_values = self.right.evaluate([Some(Value::from(env_b_l)), None], ctx);
275
276 let right_bindings = match rhs_values {
277 Value::Bag(t) => *t,
278 _ => bag![tuple![]],
279 };
280
281 for b_r in &right_bindings {
283 match &self.on {
284 None => {
285 let b_l_b_r = b_l
286 .as_tuple_ref()
287 .as_ref()
288 .tuple_concat(b_r.as_tuple_ref().borrow());
289 output_bag_left.push(Value::from(b_l_b_r));
290 }
291 Some(condition) => {
293 let b_l_b_r = b_l
294 .as_tuple_ref()
295 .as_ref()
296 .tuple_concat(b_r.as_tuple_ref().borrow());
297 let env_b_l_b_r =
298 &input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
299 let tuple_ref = DatumTupleRef::Tuple(env_b_l_b_r);
300 let cond = condition.evaluate(&tuple_ref, ctx);
301 if cond.as_ref() == &Value::Boolean(true) {
302 output_bag_left.push(Value::Tuple(Box::new(b_l_b_r)));
303 }
304 }
305 }
306 }
307
308 if output_bag_left.is_empty() {
310 let attrs = self.right.get_vars().unwrap_or(&[]);
311 let new_binding = b_l
312 .as_tuple_ref()
313 .as_ref()
314 .tuple_concat(&tuple_with_null_vals(attrs));
315 output_bag.push(Value::from(new_binding));
317 } else {
318 for elem in output_bag_left {
320 output_bag.push(elem);
321 }
322 }
323 });
324 }
325 EvalJoinKind::Full | EvalJoinKind::Right => {
326 ctx.add_error(EvaluationError::NotYetImplemented(
327 "FULL and RIGHT JOIN".to_string(),
328 ));
329 return Missing;
330 }
331 };
332 Value::Bag(Box::new(output_bag))
333 }
334
335 fn eval_type(&self) -> EvalType {
336 EvalType::SelfManaged
337 }
338}
339
340#[derive(Debug)]
352pub(crate) struct AggregateExpression {
353 pub(crate) name: String,
354 pub(crate) expr: Box<dyn EvalExpr>,
355 pub(crate) func: Box<dyn AggregateFunction>,
356}
357
358impl AggregateFunction for AggregateExpression {
359 #[inline]
360 fn next_distinct(
361 &self,
362 input_value: &Value,
363 state: &mut Option<Value>,
364 seen: &mut FxHashMap<Value, ()>,
365 ) {
366 if input_value.is_present() {
367 self.func.next_distinct(input_value, state, seen);
368 }
369 }
370
371 #[inline]
372 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
373 if input_value.is_present() {
374 self.func.next_value(input_value, state);
375 }
376 }
377
378 #[inline]
379 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
380 self.func.finalize(state)
381 }
382}
383
384pub trait AggregateFunction: Debug {
386 #[inline]
387 fn next_distinct(
388 &self,
389 input_value: &Value,
390 state: &mut Option<Value>,
391 seen: &mut FxHashMap<Value, ()>,
392 ) {
393 match seen.entry(input_value.clone()) {
394 Entry::Occupied(_) => {}
395 Entry::Vacant(v) => {
396 v.insert(());
397 self.next_value(input_value, state);
398 }
399 }
400 }
401 fn next_value(&self, input_value: &Value, state: &mut Option<Value>);
403 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError>;
405}
406
407#[derive(Debug)]
409pub(crate) struct Avg {}
410
411impl AggregateFunction for Avg {
412 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
413 match state {
414 None => *state = Some(Value::from(list![Value::from(1), input_value.clone()])),
415 Some(Value::List(list)) => {
416 if let Some(count) = list.get_mut(0) {
417 *count += &Value::from(1);
418 }
419 if let Some(sum) = list.get_mut(1) {
420 *sum += input_value;
421 }
422 }
423 _ => unreachable!(),
424 };
425 }
426
427 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
428 match state {
429 None => Ok(Null),
430 Some(Value::List(list)) => {
431 let vals = list.to_vec();
432 if let [count, sum] = &vals[..] {
433 if let Value::Integer(n) = sum {
434 let sum = Value::from(rust_decimal::Decimal::from(*n));
436 Ok(&sum / count)
437 } else {
438 Ok(sum / count)
439 }
440 } else {
441 Err(EvaluationError::IllegalState(
442 "Bad finalize state for Avg".to_string(),
443 ))
444 }
445 }
446 _ => unreachable!(),
447 }
448 }
449}
450
451#[derive(Debug)]
453pub(crate) struct Count {}
454
455impl AggregateFunction for Count {
456 fn next_value(&self, _: &Value, state: &mut Option<Value>) {
457 match state {
458 None => *state = Some(Value::from(1)),
459 Some(Value::Integer(i)) => {
460 *i += 1;
461 }
462 _ => unreachable!(),
463 };
464 }
465
466 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
467 Ok(state.unwrap_or_else(|| Value::from(0)))
468 }
469}
470
471#[derive(Debug)]
473pub(crate) struct Max {}
474
475impl AggregateFunction for Max {
476 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
477 match state {
478 None => *state = Some(input_value.clone()),
479 Some(max) => {
480 if &*max < input_value {
481 *max = input_value.clone();
482 }
483 }
484 };
485 }
486
487 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
488 Ok(state.unwrap_or(Null))
489 }
490}
491
492#[derive(Debug)]
494pub(crate) struct Min {}
495
496impl AggregateFunction for Min {
497 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
498 match state {
499 None => *state = Some(input_value.clone()),
500 Some(min) => {
501 if &*min > input_value {
502 *min = input_value.clone();
503 }
504 }
505 };
506 }
507
508 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
509 Ok(state.unwrap_or(Null))
510 }
511}
512
513#[derive(Debug)]
515pub(crate) struct Sum {}
516
517impl AggregateFunction for Sum {
518 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
519 match state {
520 None => *state = Some(input_value.clone()),
521 Some(ref mut sum) => *sum += input_value,
522 };
523 }
524
525 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
526 Ok(state.unwrap_or(Null))
527 }
528}
529
530#[derive(Debug)]
532pub(crate) struct Any {}
533
534impl AggregateFunction for Any {
535 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
536 match state {
537 None => {
538 *state = Some(match input_value {
539 Boolean(b) => Value::Boolean(*b),
540 _ => Missing,
541 });
542 }
543 Some(ref mut acc) => {
544 *acc = match (&acc, input_value) {
545 (Boolean(acc), Boolean(new)) => Boolean(*acc || *new),
546 _ => Missing,
547 }
548 }
549 };
550 }
551
552 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
553 Ok(state.unwrap_or(Null))
554 }
555}
556
557#[derive(Debug)]
559pub(crate) struct Every {}
560
561impl AggregateFunction for Every {
562 fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
563 match state {
564 None => {
565 *state = Some(match input_value {
566 Boolean(b) => Value::Boolean(*b),
567 _ => Missing,
568 });
569 }
570 Some(ref mut acc) => {
571 *acc = match (&acc, input_value) {
572 (Boolean(acc), Boolean(new)) => Boolean(*acc && *new),
573 _ => Missing,
574 }
575 }
576 };
577 }
578
579 fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
580 Ok(state.unwrap_or(Null))
581 }
582}
583
584#[derive(Debug)]
589pub(crate) struct EvalGroupBy {
590 pub(crate) strategy: EvalGroupingStrategy,
591 pub(crate) group: Vec<Box<dyn EvalExpr>>,
592 pub(crate) aliases: Vec<String>,
593 pub(crate) aggs: Vec<AggregateExpression>,
594 pub(crate) distinct_aggs: Vec<AggregateExpression>,
595 pub(crate) group_as_alias: Option<String>,
596}
597
598type GroupKey = Vec<Value>;
599type AggState = Vec<Option<Value>>;
600type DAggState = Vec<(Option<Value>, FxHashMap<Value, ()>)>;
601#[derive(Clone)]
602struct CombinedState(AggState, DAggState, Option<Vec<Value>>);
603
604#[derive(Debug)]
606pub(crate) enum EvalGroupingStrategy {
607 GroupFull,
608 GroupPartial,
609}
610
611impl EvalGroupBy {
612 #[inline]
613 pub(crate) fn new(
614 strategy: EvalGroupingStrategy,
615 group: Vec<Box<dyn EvalExpr>>,
616 aliases: Vec<String>,
617 aggs: Vec<AggregateExpression>,
618 distinct_aggs: Vec<AggregateExpression>,
619 group_as_alias: Option<String>,
620 ) -> Self {
621 Self {
622 strategy,
623 group,
624 aliases,
625 aggs,
626 distinct_aggs,
627 group_as_alias,
628 }
629 }
630
631 #[inline]
632 fn group_key<'a, 'c>(
633 &'a self,
634 bindings: &'a DatumTupleRef<'a>,
635 ctx: &'c dyn EvalContext,
636 ) -> GroupKey
637 where
638 'c: 'a,
639 {
640 self.group
641 .iter()
642 .map(|expr| match expr.evaluate(bindings, ctx).as_ref() {
643 Missing => Value::Null,
644 val => val.clone(),
645 })
646 .collect()
647 }
648}
649
650impl Evaluable for EvalGroupBy {
651 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
652 let group_as_alias = &self.group_as_alias;
653 let input_value = take_input!(inputs[0].take(), ctx);
654
655 match self.strategy {
656 EvalGroupingStrategy::GroupPartial => {
657 ctx.add_error(EvaluationError::NotYetImplemented(
658 "GROUP PARTIAL".to_string(),
659 ));
660 Missing
661 }
662 EvalGroupingStrategy::GroupFull => {
663 let mut grouped: FxHashMap<GroupKey, CombinedState> = FxHashMap::default();
664 let state = std::iter::repeat_n(None, self.aggs.len()).collect_vec();
665 let distinct_state = std::iter::repeat_with(|| (None, FxHashMap::default()))
666 .take(self.distinct_aggs.len())
667 .collect_vec();
668 let group_as = group_as_alias.as_ref().map(|_| vec![]);
669
670 let combined = CombinedState(state, distinct_state, group_as);
671
672 for v in input_value {
673 let v_as_tuple = v.as_datum_tuple_ref();
674 let group_key = self.group_key(&v_as_tuple, ctx);
675 let CombinedState(state, distinct_state, group_as) =
676 grouped.entry(group_key).or_insert_with(|| combined.clone());
677
678 for (agg_expr, state) in self.aggs.iter().zip(state.iter_mut()) {
680 let evaluated = agg_expr.expr.evaluate(&v_as_tuple, ctx);
681 agg_expr.next_value(evaluated.as_ref(), state);
682 }
683
684 for (distinct_expr, (state, seen)) in
686 self.distinct_aggs.iter().zip(distinct_state.iter_mut())
687 {
688 let evaluated = distinct_expr.expr.evaluate(&v_as_tuple, ctx);
689 distinct_expr.next_distinct(evaluated.as_ref(), state, seen);
690 }
691
692 if let Some(ref mut tuples) = group_as {
694 tuples.push(Value::from(v.coerce_into_tuple()));
695 }
696 }
697
698 let vals = grouped
699 .into_iter()
700 .map(|(group_key, state)| {
701 let CombinedState(agg_state, distinct_state, group_as) = state;
702 let group = self.aliases.iter().cloned().zip(group_key);
703
704 let aggs_with_state = self.aggs.iter().zip(agg_state);
706 let daggs_with_state = self
707 .distinct_aggs
708 .iter()
709 .zip(distinct_state.into_iter().map(|(state, _)| state));
710 let agg_data = aggs_with_state.chain(daggs_with_state).map(
711 |(aggregate_expr, state)| {
712 let val = match aggregate_expr.finalize(state) {
713 Ok(agg_result) => agg_result,
714 Err(err) => {
715 ctx.add_error(err);
716 Missing
717 }
718 };
719
720 (aggregate_expr.name.to_string(), val)
721 },
722 );
723
724 let mut tuple = Tuple::from_iter(group.chain(agg_data));
725
726 if let Some(tuples) = group_as {
728 tuple.insert(
729 group_as_alias.as_ref().unwrap(),
730 Value::from(Bag::from(tuples)),
731 );
732 }
733
734 Value::from(tuple)
735 })
736 .collect_vec();
737
738 Value::from(Bag::from(vals))
739 }
740 }
741 }
742}
743
744#[derive(Debug)]
748pub(crate) struct EvalPivot {
749 pub(crate) key: Box<dyn EvalExpr>,
750 pub(crate) value: Box<dyn EvalExpr>,
751}
752
753impl EvalPivot {
754 pub(crate) fn new(key: Box<dyn EvalExpr>, value: Box<dyn EvalExpr>) -> Self {
755 EvalPivot { key, value }
756 }
757}
758
759impl Evaluable for EvalPivot {
760 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
761 let input_value = take_input!(inputs[0].take(), ctx);
762
763 let tuple: Tuple = input_value
764 .into_iter()
765 .filter_map(|binding| {
766 let binding = binding.as_datum_tuple_ref();
767 let key = self.key.evaluate(&binding, ctx);
768 if let Value::String(s) = key.as_ref() {
769 let value = self.value.evaluate(&binding, ctx);
770 Some((s.to_string(), value.into_owned()))
771 } else {
772 None
773 }
774 })
775 .collect();
776 Value::from(tuple)
777 }
778}
779
780#[derive(Debug)]
784pub(crate) struct EvalUnpivot {
785 pub(crate) expr: Box<dyn EvalExpr>,
786 pub(crate) as_key: String,
787 pub(crate) at_key: Option<String>,
788
789 attrs: Vec<String>,
791}
792
793impl EvalUnpivot {
794 pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str, at_key: Option<String>) -> Self {
795 let attrs = if let Some(at_key) = &at_key {
796 vec![as_key.to_string(), at_key.clone()]
797 } else {
798 vec![as_key.to_string()]
799 };
800
801 EvalUnpivot {
802 expr,
803 as_key: as_key.to_string(),
804 at_key,
805
806 attrs,
807 }
808 }
809}
810
811impl Evaluable for EvalUnpivot {
812 fn evaluate(&self, _inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
813 let tuple = match self.expr.evaluate(&DatumTupleRef::Empty, ctx).into_owned() {
814 Value::Tuple(tuple) => *tuple,
815 other => other.coerce_into_tuple(),
816 };
817
818 let as_key = self.as_key.as_str();
819 let pairs = tuple;
820 let unpivoted = if let Some(at_key) = &self.at_key {
821 pairs
822 .map(|(k, v)| Tuple::from([(as_key, v), (at_key.as_str(), k.into())]))
823 .collect::<Bag>()
824 } else {
825 pairs
826 .map(|(_, v)| Tuple::from([(as_key, v)]))
827 .collect::<Bag>()
828 };
829 Value::from(unpivoted)
830 }
831
832 fn get_vars(&self) -> Option<&[String]> {
833 Some(&self.attrs)
834 }
835}
836
837#[derive(Debug)]
841pub(crate) struct EvalFilter {
842 pub(crate) expr: Box<dyn EvalExpr>,
843}
844
845impl EvalFilter {
846 pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
847 EvalFilter { expr }
848 }
849
850 #[inline]
851 fn eval_filter<'a, 'c>(
852 &'a self,
853 bindings: &'a DatumTupleRef<'a>,
854 ctx: &'c dyn EvalContext,
855 ) -> bool
856 where
857 'c: 'a,
858 {
859 let result = self.expr.evaluate(bindings, ctx);
860 match result.as_ref() {
861 Boolean(bool_val) => *bool_val,
862 _ => false,
866 }
867 }
868}
869
870impl Evaluable for EvalFilter {
871 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
872 let input_value = take_input!(inputs[0].take(), ctx);
873
874 let filtered = input_value
875 .into_iter()
876 .filter(|v| self.eval_filter(&v.as_datum_tuple_ref(), ctx));
877 Value::from(filtered.collect::<Bag>())
878 }
879}
880
881#[derive(Debug)]
885pub(crate) struct EvalHaving {
886 pub(crate) expr: Box<dyn EvalExpr>,
887}
888
889impl EvalHaving {
890 pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
891 EvalHaving { expr }
892 }
893
894 #[inline]
895 fn eval_having<'a, 'c>(
896 &'a self,
897 bindings: &'a DatumTupleRef<'a>,
898 ctx: &'c dyn EvalContext,
899 ) -> bool
900 where
901 'c: 'a,
902 {
903 let result = self.expr.evaluate(bindings, ctx);
904 match result.as_ref() {
905 Boolean(bool_val) => *bool_val,
906 _ => false,
912 }
913 }
914}
915
916impl Evaluable for EvalHaving {
917 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
918 let input_value = take_input!(inputs[0].take(), ctx);
919
920 let filtered = input_value
921 .into_iter()
922 .filter(|v| self.eval_having(&v.as_datum_tuple_ref(), ctx));
923 Value::from(filtered.collect::<Bag>())
924 }
925}
926
927#[derive(Debug)]
928pub(crate) struct EvalOrderBySortCondition {
929 pub(crate) expr: Box<dyn EvalExpr>,
930 pub(crate) spec: EvalOrderBySortSpec,
931}
932
933#[derive(Debug)]
934pub(crate) enum EvalOrderBySortSpec {
935 AscNullsFirst,
936 AscNullsLast,
937 DescNullsFirst,
938 DescNullsLast,
939}
940
941#[derive(Debug)]
943pub(crate) struct EvalOrderBy {
944 pub(crate) cmp: Vec<EvalOrderBySortCondition>,
945}
946
947impl EvalOrderBy {
948 #[inline]
949 fn compare(&self, l: &Value, r: &Value, ctx: &dyn EvalContext) -> Ordering {
950 let l = l.as_datum_tuple_ref();
951 let r = r.as_datum_tuple_ref();
952 self.cmp
953 .iter()
954 .map(|spec| {
955 let l = spec.expr.evaluate(&l, ctx);
956 let r = spec.expr.evaluate(&r, ctx);
957
958 match spec.spec {
959 EvalOrderBySortSpec::AscNullsFirst => {
960 let wrap = NullSortedValue::<true, Value>;
961 let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
962 l.cmp(&r)
963 }
964 EvalOrderBySortSpec::AscNullsLast => {
965 let wrap = NullSortedValue::<false, Value>;
966 let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
967 l.cmp(&r)
968 }
969 EvalOrderBySortSpec::DescNullsFirst => {
970 let wrap = NullSortedValue::<false, Value>;
971 let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
972 r.cmp(&l)
973 }
974 EvalOrderBySortSpec::DescNullsLast => {
975 let wrap = NullSortedValue::<true, Value>;
976 let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
977 r.cmp(&l)
978 }
979 }
980 })
981 .find_or_last(|o| o != &Ordering::Equal)
982 .unwrap_or(Ordering::Equal)
983 }
984}
985
986impl Evaluable for EvalOrderBy {
987 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
988 let input_value = take_input!(inputs[0].take(), ctx);
989
990 let values: DatumLowerResult<Vec<_>> =
991 input_value.into_iter().map(|v| v.into_lower()).collect();
992 let mut values = values.expect("lower");
994 values.sort_by(|l, r| self.compare(l, r, ctx));
995 Value::from(List::from(values))
996 }
997}
998
999#[derive(Debug)]
1001pub(crate) struct EvalLimitOffset {
1002 pub(crate) limit: Option<Box<dyn EvalExpr>>,
1003 pub(crate) offset: Option<Box<dyn EvalExpr>>,
1004}
1005
1006impl Evaluable for EvalLimitOffset {
1007 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1008 let input_value = take_input!(inputs[0].take(), ctx);
1009
1010 let empty_bindings = DatumTupleRef::Empty;
1011
1012 let offset = match &self.offset {
1013 None => 0,
1014 Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
1015 Value::Integer(i) => {
1016 if *i >= 0 {
1017 *i as usize
1018 } else {
1019 0
1020 }
1021 }
1022 _ => 0,
1023 },
1024 };
1025
1026 let limit = match &self.limit {
1027 None => None,
1028 Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
1029 Value::Integer(i) => {
1030 if *i >= 0 {
1031 Some(*i as usize)
1032 } else {
1033 None
1034 }
1035 }
1036 _ => None,
1037 },
1038 };
1039
1040 let ordered = input_value.is_ordered();
1041 fn collect(values: impl Iterator<Item = Value>, ordered: bool) -> Value {
1042 match ordered {
1043 true => Value::from(values.collect::<List>()),
1044 false => Value::from(values.collect::<Bag>()),
1045 }
1046 }
1047
1048 let offsetted = input_value.into_iter().skip(offset);
1049 match limit {
1050 Some(n) => collect(offsetted.take(n), ordered),
1051 None => collect(offsetted, ordered),
1052 }
1053 }
1054}
1055
1056#[derive(Debug)]
1060pub(crate) struct EvalSelectValue {
1061 pub(crate) expr: Box<dyn EvalExpr>,
1062}
1063
1064impl EvalSelectValue {
1065 pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
1066 EvalSelectValue { expr }
1067 }
1068}
1069
1070impl Evaluable for EvalSelectValue {
1071 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1072 let input_value = take_input!(inputs[0].take(), ctx);
1073
1074 let ordered = input_value.is_ordered();
1075
1076 let values = input_value.into_iter().map(|v| {
1077 let v_as_tuple = v.as_datum_tuple_ref();
1078 self.expr.evaluate(&v_as_tuple, ctx).into_owned()
1079 });
1080
1081 match ordered {
1082 true => Value::from(values.collect::<List>()),
1083 false => Value::from(values.collect::<Bag>()),
1084 }
1085 }
1086}
1087
1088pub(crate) struct EvalSelect {
1093 pub(crate) exprs: Vec<(String, Box<dyn EvalExpr>)>,
1094}
1095
1096impl EvalSelect {
1097 pub(crate) fn new(exprs: Vec<(String, Box<dyn EvalExpr>)>) -> Self {
1098 EvalSelect { exprs }
1099 }
1100}
1101
1102impl Debug for EvalSelect {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104 write!(f, "SELECT ")?;
1105 let mut sep = "";
1106 for (alias, expr) in &self.exprs {
1107 write!(f, "{sep}")?;
1108 expr.fmt(f)?;
1109 write!(f, " AS {alias}")?;
1110 sep = ", ";
1111 }
1112
1113 Ok(())
1114 }
1115}
1116
1117impl Evaluable for EvalSelect {
1118 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1119 let input_value = take_input!(inputs[0].take(), ctx);
1120
1121 let ordered = input_value.is_ordered();
1122
1123 let values = input_value.into_iter().map(|v| {
1124 let v_as_tuple = v.as_datum_tuple_ref();
1125
1126 let tuple_pairs = self.exprs.iter().filter_map(|(alias, expr)| {
1127 let evaluated_val = expr.evaluate(&v_as_tuple, ctx);
1128 match evaluated_val.as_ref() {
1129 Missing => None,
1130 _ => Some((alias.as_str(), evaluated_val.into_owned())),
1131 }
1132 });
1133
1134 tuple_pairs.collect::<Tuple>()
1135 });
1136
1137 match ordered {
1138 true => Value::from(values.collect::<List>()),
1139 false => Value::from(values.collect::<Bag>()),
1140 }
1141 }
1142}
1143
1144#[derive(Debug, Default)]
1147pub(crate) struct EvalSelectAll {
1148 pub(crate) passthrough: bool,
1149}
1150
1151impl EvalSelectAll {
1152 pub(crate) fn new(passthrough: bool) -> Self {
1153 Self { passthrough }
1154 }
1155}
1156
1157impl Evaluable for EvalSelectAll {
1158 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1159 let input_value = take_input!(inputs[0].take(), ctx);
1160
1161 let ordered = input_value.is_ordered();
1162
1163 let values = input_value.into_iter().map(|val| {
1164 if self.passthrough {
1165 val.coerce_into_tuple()
1166 } else {
1167 val.coerce_into_tuple()
1168 .into_values()
1169 .flat_map(|v| v.coerce_into_tuple().into_pairs())
1170 .collect::<Tuple>()
1171 }
1172 });
1173
1174 match ordered {
1175 true => Value::from(values.collect::<List>()),
1176 false => Value::from(values.collect::<Bag>()),
1177 }
1178 }
1179}
1180
1181#[derive(Debug)]
1185pub(crate) struct EvalExprQuery {
1186 pub(crate) expr: Box<dyn EvalExpr>,
1187}
1188
1189impl EvalExprQuery {
1190 pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
1191 EvalExprQuery { expr }
1192 }
1193}
1194
1195impl Evaluable for EvalExprQuery {
1196 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1197 let input = inputs[0].take().unwrap_or(Value::Null);
1198 let input = input.as_tuple_ref();
1199 let input = input.as_ref();
1200 let input = DatumTupleRef::Tuple(input);
1201 self.expr.evaluate(&input, ctx).into_owned()
1202 }
1203}
1204
1205#[derive(Debug, Default)]
1207pub(crate) struct EvalDistinct {}
1208
1209impl EvalDistinct {
1210 pub(crate) fn new() -> Self {
1211 Self::default()
1212 }
1213}
1214
1215impl Evaluable for EvalDistinct {
1216 fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1217 let input_value = take_input!(inputs[0].take(), ctx);
1218 let ordered = input_value.is_ordered();
1219
1220 let values = input_value.into_iter().unique();
1221 match ordered {
1222 true => Value::from(values.collect::<List>()),
1223 false => Value::from(values.collect::<Bag>()),
1224 }
1225 }
1226}
1227
1228pub(crate) struct EvalSink {}
1230
1231impl Evaluable for EvalSink {
1232 fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1233 inputs[0].take().unwrap_or(Missing)
1234 }
1235}
1236
1237impl Debug for EvalSink {
1238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1239 write!(f, "SINK")
1240 }
1241}
1242
1243#[derive(Debug)]
1246pub(crate) struct EvalSubQueryExpr {
1247 pub(crate) plan: Rc<RefCell<EvalPlan>>,
1248}
1249
1250impl EvalSubQueryExpr {
1251 pub(crate) fn new(plan: EvalPlan) -> Self {
1252 EvalSubQueryExpr {
1253 plan: Rc::new(RefCell::new(plan)),
1254 }
1255 }
1256}
1257
1258impl EvalExpr for EvalSubQueryExpr {
1259 fn evaluate<'a, 'c, 'o>(
1260 &'a self,
1261 bindings: &'a dyn RefTupleView<'a, Value>,
1262 ctx: &'c dyn EvalContext,
1263 ) -> Cow<'o, Value>
1264 where
1265 'c: 'a,
1266 'a: 'o,
1267 {
1268 let value = {
1269 let bindings = MapBindings::from(bindings);
1270 let nested_ctx = NestedContext::new(bindings, ctx);
1271
1272 let plan = RefCell::borrow(&self.plan);
1273
1274 let value = match plan.execute(&nested_ctx) {
1275 Ok(evaluated) => evaluated.result,
1276 Err(err) => {
1277 for e in err.errors {
1278 ctx.add_error(e);
1279 }
1280 Missing
1281 }
1282 };
1283
1284 value.clone()
1285 };
1286 Cow::Owned(value)
1287 }
1288}
1289
1290#[inline]
1299fn bagop_iter(v: Value) -> ValueIntoIterator {
1300 match v {
1301 Value::Null | Value::Missing => ValueIntoIterator::Single(None),
1302 other => other.into_iter(),
1303 }
1304}
1305
1306#[derive(Debug, PartialEq)]
1308pub(crate) struct EvalOuterUnion {
1309 pub(crate) setq: SetQuantifier,
1310}
1311
1312impl EvalOuterUnion {
1313 pub(crate) fn new(setq: SetQuantifier) -> Self {
1314 EvalOuterUnion { setq }
1315 }
1316}
1317
1318impl Evaluable for EvalOuterUnion {
1319 fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1320 let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1321 let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1322 let chained = lhs.chain(rhs);
1323 let vals = match self.setq {
1324 SetQuantifier::All => chained.collect_vec(),
1325 SetQuantifier::Distinct => chained.unique().collect_vec(),
1326 };
1327 Value::from(Bag::from(vals))
1328 }
1329}
1330
1331#[derive(Debug, PartialEq)]
1333pub(crate) struct EvalOuterIntersect {
1334 pub(crate) setq: SetQuantifier,
1335}
1336
1337impl EvalOuterIntersect {
1338 pub(crate) fn new(setq: SetQuantifier) -> Self {
1339 EvalOuterIntersect { setq }
1340 }
1341}
1342
1343impl Evaluable for EvalOuterIntersect {
1344 fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1345 let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1346 let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1347
1348 let bag: Bag = match self.setq {
1349 SetQuantifier::All => {
1350 let mut lhs = lhs.counts();
1351 Bag::from_iter(rhs.filter(|elem| match lhs.get_mut(elem) {
1352 Some(count) if *count > 0 => {
1353 *count -= 1;
1354 true
1355 }
1356 _ => false,
1357 }))
1358 }
1359 SetQuantifier::Distinct => {
1360 let lhs: HashSet<Value> = lhs.collect();
1361 Bag::from_iter(
1362 rhs.filter(|elem| lhs.contains(elem))
1363 .collect::<HashSet<_>>(),
1364 )
1365 }
1366 };
1367 Value::from(bag)
1368 }
1369}
1370
1371#[derive(Debug, PartialEq)]
1373pub(crate) struct EvalOuterExcept {
1374 pub(crate) setq: SetQuantifier,
1375}
1376
1377impl EvalOuterExcept {
1378 pub(crate) fn new(setq: SetQuantifier) -> Self {
1379 EvalOuterExcept { setq }
1380 }
1381}
1382
1383impl Evaluable for EvalOuterExcept {
1384 fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1385 let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1386 let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1387
1388 let mut exclude = rhs.counts();
1389 let excepted = lhs.filter(|elem| match exclude.get_mut(elem) {
1390 Some(count) if *count > 0 => {
1391 *count -= 1;
1392 false
1393 }
1394 _ => true,
1395 });
1396 let vals = match self.setq {
1397 SetQuantifier::All => excepted.collect_vec(),
1398 SetQuantifier::Distinct => excepted.unique().collect_vec(),
1399 };
1400 Value::from(Bag::from(vals))
1401 }
1402}
1403
1404#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1406pub(crate) enum SetQuantifier {
1407 All,
1408 Distinct,
1409}