partiql_eval/eval/
evaluable.rs

1use crate::error::EvaluationError;
2use crate::eval::expr::EvalExpr;
3use crate::eval::{EvalContext, EvalPlan, NestedContext};
4use itertools::Itertools;
5use partiql_value::Value::{Boolean, Missing, Null};
6use partiql_value::{
7    bag, list, tuple, Bag, List, NullSortedValue, Tuple, Value, ValueIntoIterator,
8};
9use rustc_hash::FxHashMap;
10use std::borrow::Borrow;
11use std::borrow::Cow;
12use std::cell::RefCell;
13use std::cmp::Ordering;
14use std::collections::hash_map::Entry;
15use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17
18use crate::env::basic::MapBindings;
19use partiql_value::datum::{Datum, DatumLower, DatumLowerResult, DatumTupleRef, RefTupleView};
20use std::rc::Rc;
21
22#[macro_export]
23macro_rules! take_input {
24    ($expr:expr, $ctx:expr) => {
25        match $expr {
26            None => {
27                $ctx.add_error($crate::error::EvaluationError::IllegalState(
28                    "Error in retrieving input value".to_string(),
29                ));
30                return partiql_value::Value::Missing;
31            }
32            Some(val) => val,
33        }
34    };
35}
36
37/// Whether an [`Evaluable`] takes input from the plan graph or manages its own iteration.
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
39pub enum EvalType {
40    SelfManaged,
41    GraphManaged,
42}
43
44/// `Evaluable` represents each evaluation operator in the evaluation plan as an evaluable entity.
45pub trait Evaluable: Debug {
46    fn evaluate(&self, inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value;
47    fn get_vars(&self) -> Option<&[String]> {
48        None
49    }
50    fn eval_type(&self) -> EvalType {
51        EvalType::GraphManaged
52    }
53}
54
55/// Represents an evaluation `Scan` operator; `Scan` operator scans the given bindings from its
56/// input and the environment and outputs a bag of binding tuples for tuples/values matching the
57/// scan `expr`, e.g. an SQL expression `table1` in SQL expression `FROM table1`.
58pub(crate) struct EvalScan {
59    pub(crate) expr: Box<dyn EvalExpr>,
60    pub(crate) as_key: String,
61    pub(crate) at_key: Option<String>,
62
63    // cached values
64    attrs: Vec<String>,
65}
66
67impl Debug for EvalScan {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        write!(f, "SCAN ")?;
70        self.expr.fmt(f)?;
71
72        write!(f, " AS {}", self.as_key)?;
73
74        if let Some(at_key) = &self.at_key {
75            write!(f, " AT {at_key}")?;
76        }
77
78        Ok(())
79    }
80}
81
82impl EvalScan {
83    pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str) -> Self {
84        let attrs = vec![as_key.to_string()];
85        EvalScan {
86            expr,
87            as_key: as_key.to_string(),
88            at_key: None,
89
90            attrs,
91        }
92    }
93    pub(crate) fn new_with_at_key(expr: Box<dyn EvalExpr>, as_key: &str, at_key: &str) -> Self {
94        let attrs = vec![as_key.to_string(), at_key.to_string()];
95        EvalScan {
96            expr,
97            as_key: as_key.to_string(),
98            at_key: Some(at_key.to_string()),
99
100            attrs,
101        }
102    }
103}
104
105impl Evaluable for EvalScan {
106    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
107        let input_value = inputs[0].take().unwrap_or(Missing);
108
109        let bindings = match input_value {
110            Value::Bag(t) => *t,
111            Value::Tuple(t) => bag![*t],
112            _ => bag![tuple![]],
113        };
114        let mut value = bag![];
115        bindings.iter().for_each(|binding| {
116            let binding_tuple = binding.as_datum_tuple_ref();
117            let v = self.expr.evaluate(&binding_tuple, ctx).into_owned();
118            let mut at_index_counter: i64 = 0;
119            if let Some(at_key) = &self.at_key {
120                let ordered = v.is_ordered();
121                for t in v {
122                    let mut out = Tuple::from([(self.as_key.as_str(), t)]);
123                    let at_id = if ordered {
124                        at_index_counter.into()
125                    } else {
126                        Missing
127                    };
128                    out.insert(at_key, at_id);
129                    value.push(Value::Tuple(Box::new(out)));
130                    at_index_counter += 1;
131                }
132            } else {
133                for t in v {
134                    let out = Tuple::from([(self.as_key.as_str(), t)]);
135                    value.push(Value::Tuple(Box::new(out)));
136                }
137            }
138        });
139
140        Value::Bag(Box::new(value))
141    }
142
143    fn get_vars(&self) -> Option<&[String]> {
144        Some(&self.attrs)
145    }
146}
147
148/// Represents an evaluation `Join` operator; `Join` joins the tuples from its LHS and RHS based on a logic defined
149/// by [`EvalJoinKind`]. For semantics of `PartiQL` joins and their distinction with SQL's see sections
150/// 5.3 – 5.7 of [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
151pub(crate) struct EvalJoin {
152    pub(crate) kind: EvalJoinKind,
153    pub(crate) on: Option<Box<dyn EvalExpr>>,
154
155    pub(crate) left: Box<dyn Evaluable>,
156    pub(crate) right: Box<dyn Evaluable>,
157}
158
159#[derive(Debug)]
160pub(crate) enum EvalJoinKind {
161    Inner,
162    Left,
163    Right,
164    Full,
165}
166
167impl Debug for EvalJoin {
168    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
169        write!(f, "{:#?} JOIN", &self.kind)?;
170        if let Some(on) = &self.on {
171            write!(f, " ON ")?;
172            on.fmt(f)?;
173        }
174        Ok(())
175    }
176}
177
178impl EvalJoin {
179    pub(crate) fn new(
180        kind: EvalJoinKind,
181        left: Box<dyn Evaluable>,
182        right: Box<dyn Evaluable>,
183        on: Option<Box<dyn EvalExpr>>,
184    ) -> Self {
185        EvalJoin {
186            kind,
187            on,
188
189            left,
190            right,
191        }
192    }
193}
194
195impl Evaluable for EvalJoin {
196    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
197        /// Creates a `Tuple` with attributes `attrs`, each with value `Null`
198        #[inline]
199        fn tuple_with_null_vals<I, S>(attrs: I) -> Tuple
200        where
201            S: Into<String>,
202            I: IntoIterator<Item = S>,
203        {
204            attrs.into_iter().map(|k| (k.into(), Null)).collect()
205        }
206
207        let mut output_bag = bag![];
208        let input_env = inputs[0].take().unwrap_or_else(|| Value::from(tuple![]));
209        let lhs_values = self.left.evaluate([Some(input_env.clone()), None], ctx);
210        let left_bindings = match lhs_values {
211            Value::Bag(t) => *t,
212            _ => {
213                ctx.add_error(EvaluationError::IllegalState(
214                    "Left side of FROM source should result in a bag of bindings".to_string(),
215                ));
216                return Missing;
217            }
218        };
219
220        // Current implementations follow pseudocode defined in section 5.6 of spec
221        // https://partiql.org/assets/PartiQL-Specification.pdf#subsection.5.6
222        match self.kind {
223            EvalJoinKind::Inner => {
224                // for each binding b_l in eval(p0, p, l)
225                left_bindings.iter().for_each(|b_l| {
226                    let env_b_l = input_env
227                        .as_tuple_ref()
228                        .as_ref()
229                        .tuple_concat(b_l.as_tuple_ref().borrow());
230                    let rhs_values = self.right.evaluate([Some(Value::from(env_b_l)), None], ctx);
231
232                    let right_bindings = match rhs_values {
233                        Value::Bag(t) => *t,
234                        _ => bag![tuple![]],
235                    };
236
237                    // for each binding b_r in eval (p0, (p || b_l), r)
238                    for b_r in &right_bindings {
239                        match &self.on {
240                            None => {
241                                let b_l_b_r = b_l
242                                    .as_tuple_ref()
243                                    .as_ref()
244                                    .tuple_concat(b_r.as_tuple_ref().borrow());
245                                output_bag.push(Value::from(b_l_b_r));
246                            }
247                            // if eval(p0, (p || b_l || b_r), c) is true, add b_l || b_r to output bag
248                            Some(condition) => {
249                                let b_l_b_r = b_l
250                                    .as_tuple_ref()
251                                    .as_ref()
252                                    .tuple_concat(b_r.as_tuple_ref().borrow());
253                                let env_b_l_b_r =
254                                    &input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
255                                let tuple_ref = DatumTupleRef::Tuple(env_b_l_b_r);
256                                let cond = condition.evaluate(&tuple_ref, ctx);
257                                if let Value::Boolean(true) = cond.as_ref() {
258                                    output_bag.push(Value::Tuple(Box::new(b_l_b_r)));
259                                }
260                            }
261                        }
262                    }
263                });
264            }
265            EvalJoinKind::Left => {
266                // for each binding b_l in eval(p0, p, l)
267                left_bindings.iter().for_each(|b_l| {
268                    // define empty bag q_r
269                    let mut output_bag_left = bag![];
270                    let env_b_l = input_env
271                        .as_tuple_ref()
272                        .as_ref()
273                        .tuple_concat(b_l.as_tuple_ref().borrow());
274                    let rhs_values = self.right.evaluate([Some(Value::from(env_b_l)), None], ctx);
275
276                    let right_bindings = match rhs_values {
277                        Value::Bag(t) => *t,
278                        _ => bag![tuple![]],
279                    };
280
281                    // for each binding b_r in eval (p0, (p || b_l), r)
282                    for b_r in &right_bindings {
283                        match &self.on {
284                            None => {
285                                let b_l_b_r = b_l
286                                    .as_tuple_ref()
287                                    .as_ref()
288                                    .tuple_concat(b_r.as_tuple_ref().borrow());
289                                output_bag_left.push(Value::from(b_l_b_r));
290                            }
291                            // if eval(p0, (p || b_l || b_r), c) is true, add b_l || b_r to q_r
292                            Some(condition) => {
293                                let b_l_b_r = b_l
294                                    .as_tuple_ref()
295                                    .as_ref()
296                                    .tuple_concat(b_r.as_tuple_ref().borrow());
297                                let env_b_l_b_r =
298                                    &input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
299                                let tuple_ref = DatumTupleRef::Tuple(env_b_l_b_r);
300                                let cond = condition.evaluate(&tuple_ref, ctx);
301                                if cond.as_ref() == &Value::Boolean(true) {
302                                    output_bag_left.push(Value::Tuple(Box::new(b_l_b_r)));
303                                }
304                            }
305                        }
306                    }
307
308                    // if q_r is the empty bag
309                    if output_bag_left.is_empty() {
310                        let attrs = self.right.get_vars().unwrap_or(&[]);
311                        let new_binding = b_l
312                            .as_tuple_ref()
313                            .as_ref()
314                            .tuple_concat(&tuple_with_null_vals(attrs));
315                        // add b_l || <v_1_r: NULL, ..., v_n_r: NULL> to output bag
316                        output_bag.push(Value::from(new_binding));
317                    } else {
318                        // otherwise for each binding b_r in q_r, add b_l || b_r to output bag
319                        for elem in output_bag_left {
320                            output_bag.push(elem);
321                        }
322                    }
323                });
324            }
325            EvalJoinKind::Full | EvalJoinKind::Right => {
326                ctx.add_error(EvaluationError::NotYetImplemented(
327                    "FULL and RIGHT JOIN".to_string(),
328                ));
329                return Missing;
330            }
331        };
332        Value::Bag(Box::new(output_bag))
333    }
334
335    fn eval_type(&self) -> EvalType {
336        EvalType::SelfManaged
337    }
338}
339
340/// An SQL aggregation function call that has been rewritten to be evaluated with the `GROUP BY`
341/// clause. The `[name]` is the string (generated in AST lowering step) that replaces the
342/// aggregation call expression. This name will be used as the field in the binding tuple output
343/// by `GROUP BY`. `[expr]` corresponds to the expression within the aggregation function. And
344/// `[func]` corresponds to the aggregation function that's being called (e.g. sum, count, avg).
345///
346/// For example, `SELECT a AS a, SUM(b) AS b FROM t GROUP BY a` is rewritten to the following form
347///              `SELECT a AS a, $__agg_1 AS b FROM t GROUP BY a`
348/// In the above example, `name` corresponds to '$__`agg_1`', `expr` refers to the expression within
349/// the aggregation function, `b`, and `func` corresponds to the sum aggregation function,
350/// `[AggSum]`.
351#[derive(Debug)]
352pub(crate) struct AggregateExpression {
353    pub(crate) name: String,
354    pub(crate) expr: Box<dyn EvalExpr>,
355    pub(crate) func: Box<dyn AggregateFunction>,
356}
357
358impl AggregateFunction for AggregateExpression {
359    #[inline]
360    fn next_distinct(
361        &self,
362        input_value: &Value,
363        state: &mut Option<Value>,
364        seen: &mut FxHashMap<Value, ()>,
365    ) {
366        if input_value.is_present() {
367            self.func.next_distinct(input_value, state, seen);
368        }
369    }
370
371    #[inline]
372    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
373        if input_value.is_present() {
374            self.func.next_value(input_value, state);
375        }
376    }
377
378    #[inline]
379    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
380        self.func.finalize(state)
381    }
382}
383
384/// Represents an SQL aggregation function computed on a collection of input values.
385pub trait AggregateFunction: Debug {
386    #[inline]
387    fn next_distinct(
388        &self,
389        input_value: &Value,
390        state: &mut Option<Value>,
391        seen: &mut FxHashMap<Value, ()>,
392    ) {
393        match seen.entry(input_value.clone()) {
394            Entry::Occupied(_) => {}
395            Entry::Vacant(v) => {
396                v.insert(());
397                self.next_value(input_value, state);
398            }
399        }
400    }
401    /// Provides the next value for the given `group`.
402    fn next_value(&self, input_value: &Value, state: &mut Option<Value>);
403    /// Returns the result of the aggregation function for a given `group`.
404    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError>;
405}
406
407/// Represents SQL's `AVG` aggregation function
408#[derive(Debug)]
409pub(crate) struct Avg {}
410
411impl AggregateFunction for Avg {
412    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
413        match state {
414            None => *state = Some(Value::from(list![Value::from(1), input_value.clone()])),
415            Some(Value::List(list)) => {
416                if let Some(count) = list.get_mut(0) {
417                    *count += &Value::from(1);
418                }
419                if let Some(sum) = list.get_mut(1) {
420                    *sum += input_value;
421                }
422            }
423            _ => unreachable!(),
424        };
425    }
426
427    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
428        match state {
429            None => Ok(Null),
430            Some(Value::List(list)) => {
431                let vals = list.to_vec();
432                if let [count, sum] = &vals[..] {
433                    if let Value::Integer(n) = sum {
434                        // Avg does not do integer division; convert to decimal
435                        let sum = Value::from(rust_decimal::Decimal::from(*n));
436                        Ok(&sum / count)
437                    } else {
438                        Ok(sum / count)
439                    }
440                } else {
441                    Err(EvaluationError::IllegalState(
442                        "Bad finalize state for Avg".to_string(),
443                    ))
444                }
445            }
446            _ => unreachable!(),
447        }
448    }
449}
450
451/// Represents SQL's `COUNT` aggregation function
452#[derive(Debug)]
453pub(crate) struct Count {}
454
455impl AggregateFunction for Count {
456    fn next_value(&self, _: &Value, state: &mut Option<Value>) {
457        match state {
458            None => *state = Some(Value::from(1)),
459            Some(Value::Integer(i)) => {
460                *i += 1;
461            }
462            _ => unreachable!(),
463        };
464    }
465
466    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
467        Ok(state.unwrap_or_else(|| Value::from(0)))
468    }
469}
470
471/// Represents SQL's `MAX` aggregation function
472#[derive(Debug)]
473pub(crate) struct Max {}
474
475impl AggregateFunction for Max {
476    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
477        match state {
478            None => *state = Some(input_value.clone()),
479            Some(max) => {
480                if &*max < input_value {
481                    *max = input_value.clone();
482                }
483            }
484        };
485    }
486
487    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
488        Ok(state.unwrap_or(Null))
489    }
490}
491
492/// Represents SQL's `MIN` aggregation function
493#[derive(Debug)]
494pub(crate) struct Min {}
495
496impl AggregateFunction for Min {
497    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
498        match state {
499            None => *state = Some(input_value.clone()),
500            Some(min) => {
501                if &*min > input_value {
502                    *min = input_value.clone();
503                }
504            }
505        };
506    }
507
508    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
509        Ok(state.unwrap_or(Null))
510    }
511}
512
513/// Represents SQL's `SUM` aggregation function
514#[derive(Debug)]
515pub(crate) struct Sum {}
516
517impl AggregateFunction for Sum {
518    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
519        match state {
520            None => *state = Some(input_value.clone()),
521            Some(ref mut sum) => *sum += input_value,
522        };
523    }
524
525    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
526        Ok(state.unwrap_or(Null))
527    }
528}
529
530/// Represents SQL's `ANY`/`SOME` aggregation function
531#[derive(Debug)]
532pub(crate) struct Any {}
533
534impl AggregateFunction for Any {
535    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
536        match state {
537            None => {
538                *state = Some(match input_value {
539                    Boolean(b) => Value::Boolean(*b),
540                    _ => Missing,
541                });
542            }
543            Some(ref mut acc) => {
544                *acc = match (&acc, input_value) {
545                    (Boolean(acc), Boolean(new)) => Boolean(*acc || *new),
546                    _ => Missing,
547                }
548            }
549        };
550    }
551
552    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
553        Ok(state.unwrap_or(Null))
554    }
555}
556
557/// Represents SQL's `EVERY` aggregation function
558#[derive(Debug)]
559pub(crate) struct Every {}
560
561impl AggregateFunction for Every {
562    fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
563        match state {
564            None => {
565                *state = Some(match input_value {
566                    Boolean(b) => Value::Boolean(*b),
567                    _ => Missing,
568                });
569            }
570            Some(ref mut acc) => {
571                *acc = match (&acc, input_value) {
572                    (Boolean(acc), Boolean(new)) => Boolean(*acc && *new),
573                    _ => Missing,
574                }
575            }
576        };
577    }
578
579    fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
580        Ok(state.unwrap_or(Null))
581    }
582}
583
584/// Represents an evaluation `GROUP BY` operator. For `GROUP BY` operational semantics, see section
585/// `11` of
586/// [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
587/// `aggregate_exprs` represents the set of aggregate expressions to compute.
588#[derive(Debug)]
589pub(crate) struct EvalGroupBy {
590    pub(crate) strategy: EvalGroupingStrategy,
591    pub(crate) group: Vec<Box<dyn EvalExpr>>,
592    pub(crate) aliases: Vec<String>,
593    pub(crate) aggs: Vec<AggregateExpression>,
594    pub(crate) distinct_aggs: Vec<AggregateExpression>,
595    pub(crate) group_as_alias: Option<String>,
596}
597
598type GroupKey = Vec<Value>;
599type AggState = Vec<Option<Value>>;
600type DAggState = Vec<(Option<Value>, FxHashMap<Value, ()>)>;
601#[derive(Clone)]
602struct CombinedState(AggState, DAggState, Option<Vec<Value>>);
603
604/// Represents the grouping qualifier: ALL or PARTIAL.
605#[derive(Debug)]
606pub(crate) enum EvalGroupingStrategy {
607    GroupFull,
608    GroupPartial,
609}
610
611impl EvalGroupBy {
612    #[inline]
613    pub(crate) fn new(
614        strategy: EvalGroupingStrategy,
615        group: Vec<Box<dyn EvalExpr>>,
616        aliases: Vec<String>,
617        aggs: Vec<AggregateExpression>,
618        distinct_aggs: Vec<AggregateExpression>,
619        group_as_alias: Option<String>,
620    ) -> Self {
621        Self {
622            strategy,
623            group,
624            aliases,
625            aggs,
626            distinct_aggs,
627            group_as_alias,
628        }
629    }
630
631    #[inline]
632    fn group_key<'a, 'c>(
633        &'a self,
634        bindings: &'a DatumTupleRef<'a>,
635        ctx: &'c dyn EvalContext,
636    ) -> GroupKey
637    where
638        'c: 'a,
639    {
640        self.group
641            .iter()
642            .map(|expr| match expr.evaluate(bindings, ctx).as_ref() {
643                Missing => Value::Null,
644                val => val.clone(),
645            })
646            .collect()
647    }
648}
649
650impl Evaluable for EvalGroupBy {
651    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
652        let group_as_alias = &self.group_as_alias;
653        let input_value = take_input!(inputs[0].take(), ctx);
654
655        match self.strategy {
656            EvalGroupingStrategy::GroupPartial => {
657                ctx.add_error(EvaluationError::NotYetImplemented(
658                    "GROUP PARTIAL".to_string(),
659                ));
660                Missing
661            }
662            EvalGroupingStrategy::GroupFull => {
663                let mut grouped: FxHashMap<GroupKey, CombinedState> = FxHashMap::default();
664                let state = std::iter::repeat_n(None, self.aggs.len()).collect_vec();
665                let distinct_state = std::iter::repeat_with(|| (None, FxHashMap::default()))
666                    .take(self.distinct_aggs.len())
667                    .collect_vec();
668                let group_as = group_as_alias.as_ref().map(|_| vec![]);
669
670                let combined = CombinedState(state, distinct_state, group_as);
671
672                for v in input_value {
673                    let v_as_tuple = v.as_datum_tuple_ref();
674                    let group_key = self.group_key(&v_as_tuple, ctx);
675                    let CombinedState(state, distinct_state, group_as) =
676                        grouped.entry(group_key).or_insert_with(|| combined.clone());
677
678                    // Compute next aggregation result for each of the aggregation expressions
679                    for (agg_expr, state) in self.aggs.iter().zip(state.iter_mut()) {
680                        let evaluated = agg_expr.expr.evaluate(&v_as_tuple, ctx);
681                        agg_expr.next_value(evaluated.as_ref(), state);
682                    }
683
684                    // Compute next aggregation result for each of the distinct aggregation expressions
685                    for (distinct_expr, (state, seen)) in
686                        self.distinct_aggs.iter().zip(distinct_state.iter_mut())
687                    {
688                        let evaluated = distinct_expr.expr.evaluate(&v_as_tuple, ctx);
689                        distinct_expr.next_distinct(evaluated.as_ref(), state, seen);
690                    }
691
692                    // Add tuple to `GROUP AS` if applicable
693                    if let Some(ref mut tuples) = group_as {
694                        tuples.push(Value::from(v.coerce_into_tuple()));
695                    }
696                }
697
698                let vals = grouped
699                    .into_iter()
700                    .map(|(group_key, state)| {
701                        let CombinedState(agg_state, distinct_state, group_as) = state;
702                        let group = self.aliases.iter().cloned().zip(group_key);
703
704                        // finalize all aggregates
705                        let aggs_with_state = self.aggs.iter().zip(agg_state);
706                        let daggs_with_state = self
707                            .distinct_aggs
708                            .iter()
709                            .zip(distinct_state.into_iter().map(|(state, _)| state));
710                        let agg_data = aggs_with_state.chain(daggs_with_state).map(
711                            |(aggregate_expr, state)| {
712                                let val = match aggregate_expr.finalize(state) {
713                                    Ok(agg_result) => agg_result,
714                                    Err(err) => {
715                                        ctx.add_error(err);
716                                        Missing
717                                    }
718                                };
719
720                                (aggregate_expr.name.to_string(), val)
721                            },
722                        );
723
724                        let mut tuple = Tuple::from_iter(group.chain(agg_data));
725
726                        // insert `GROUP AS` if applicable
727                        if let Some(tuples) = group_as {
728                            tuple.insert(
729                                group_as_alias.as_ref().unwrap(),
730                                Value::from(Bag::from(tuples)),
731                            );
732                        }
733
734                        Value::from(tuple)
735                    })
736                    .collect_vec();
737
738                Value::from(Bag::from(vals))
739            }
740        }
741    }
742}
743
744/// Represents an evaluation `Pivot` operator; the `Pivot` enables turning a collection into a
745/// tuple. For `Pivot` operational semantics, see section `6.2` of
746/// [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
747#[derive(Debug)]
748pub(crate) struct EvalPivot {
749    pub(crate) key: Box<dyn EvalExpr>,
750    pub(crate) value: Box<dyn EvalExpr>,
751}
752
753impl EvalPivot {
754    pub(crate) fn new(key: Box<dyn EvalExpr>, value: Box<dyn EvalExpr>) -> Self {
755        EvalPivot { key, value }
756    }
757}
758
759impl Evaluable for EvalPivot {
760    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
761        let input_value = take_input!(inputs[0].take(), ctx);
762
763        let tuple: Tuple = input_value
764            .into_iter()
765            .filter_map(|binding| {
766                let binding = binding.as_datum_tuple_ref();
767                let key = self.key.evaluate(&binding, ctx);
768                if let Value::String(s) = key.as_ref() {
769                    let value = self.value.evaluate(&binding, ctx);
770                    Some((s.to_string(), value.into_owned()))
771                } else {
772                    None
773                }
774            })
775            .collect();
776        Value::from(tuple)
777    }
778}
779
780/// Represents an evaluation `Unpivot` operator; the `Unpivot` enables ranging over the
781/// attribute-value pairs of a tuple. For `Unpivot` operational semantics, see section `5.2` of
782/// [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
783#[derive(Debug)]
784pub(crate) struct EvalUnpivot {
785    pub(crate) expr: Box<dyn EvalExpr>,
786    pub(crate) as_key: String,
787    pub(crate) at_key: Option<String>,
788
789    // cached values
790    attrs: Vec<String>,
791}
792
793impl EvalUnpivot {
794    pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str, at_key: Option<String>) -> Self {
795        let attrs = if let Some(at_key) = &at_key {
796            vec![as_key.to_string(), at_key.clone()]
797        } else {
798            vec![as_key.to_string()]
799        };
800
801        EvalUnpivot {
802            expr,
803            as_key: as_key.to_string(),
804            at_key,
805
806            attrs,
807        }
808    }
809}
810
811impl Evaluable for EvalUnpivot {
812    fn evaluate(&self, _inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
813        let tuple = match self.expr.evaluate(&DatumTupleRef::Empty, ctx).into_owned() {
814            Value::Tuple(tuple) => *tuple,
815            other => other.coerce_into_tuple(),
816        };
817
818        let as_key = self.as_key.as_str();
819        let pairs = tuple;
820        let unpivoted = if let Some(at_key) = &self.at_key {
821            pairs
822                .map(|(k, v)| Tuple::from([(as_key, v), (at_key.as_str(), k.into())]))
823                .collect::<Bag>()
824        } else {
825            pairs
826                .map(|(_, v)| Tuple::from([(as_key, v)]))
827                .collect::<Bag>()
828        };
829        Value::from(unpivoted)
830    }
831
832    fn get_vars(&self) -> Option<&[String]> {
833        Some(&self.attrs)
834    }
835}
836
837/// Represents an evaluation `Filter` operator; for an input bag of binding tuples the `Filter`
838/// operator filters out the binding tuples that does not meet the condition expressed as `expr`,
839/// e.g.`a > 2` in `WHERE a > 2` expression.
840#[derive(Debug)]
841pub(crate) struct EvalFilter {
842    pub(crate) expr: Box<dyn EvalExpr>,
843}
844
845impl EvalFilter {
846    pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
847        EvalFilter { expr }
848    }
849
850    #[inline]
851    fn eval_filter<'a, 'c>(
852        &'a self,
853        bindings: &'a DatumTupleRef<'a>,
854        ctx: &'c dyn EvalContext,
855    ) -> bool
856    where
857        'c: 'a,
858    {
859        let result = self.expr.evaluate(bindings, ctx);
860        match result.as_ref() {
861            Boolean(bool_val) => *bool_val,
862            // Alike SQL, when the expression of the WHERE clause expression evaluates to
863            // absent value or a value that is not a Boolean, PartiQL eliminates the corresponding
864            // binding. PartiQL Specification August 1, 2019 Draft, Section 8. `WHERE clause`
865            _ => false,
866        }
867    }
868}
869
870impl Evaluable for EvalFilter {
871    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
872        let input_value = take_input!(inputs[0].take(), ctx);
873
874        let filtered = input_value
875            .into_iter()
876            .filter(|v| self.eval_filter(&v.as_datum_tuple_ref(), ctx));
877        Value::from(filtered.collect::<Bag>())
878    }
879}
880
881/// Represents an evaluation `Having` operator; for an input bag of binding tuples the `Having`
882/// operator filters out the binding tuples that does not meet the condition expressed as `expr`,
883/// e.g. `a = 10` in `HAVING a = 10` expression.
884#[derive(Debug)]
885pub(crate) struct EvalHaving {
886    pub(crate) expr: Box<dyn EvalExpr>,
887}
888
889impl EvalHaving {
890    pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
891        EvalHaving { expr }
892    }
893
894    #[inline]
895    fn eval_having<'a, 'c>(
896        &'a self,
897        bindings: &'a DatumTupleRef<'a>,
898        ctx: &'c dyn EvalContext,
899    ) -> bool
900    where
901        'c: 'a,
902    {
903        let result = self.expr.evaluate(bindings, ctx);
904        match result.as_ref() {
905            Boolean(bool_val) => *bool_val,
906            // Alike SQL, when the expression of the HAVING clause expression evaluates to
907            // absent value or a value that is not a Boolean, PartiQL eliminates the corresponding
908            // binding. PartiQL Specification August 1, 2019 Draft, Section 11.1.
909            // > HAVING behaves identical to a WHERE, once groups are already formulated earlier
910            // See Section 8 on WHERE semantics
911            _ => false,
912        }
913    }
914}
915
916impl Evaluable for EvalHaving {
917    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
918        let input_value = take_input!(inputs[0].take(), ctx);
919
920        let filtered = input_value
921            .into_iter()
922            .filter(|v| self.eval_having(&v.as_datum_tuple_ref(), ctx));
923        Value::from(filtered.collect::<Bag>())
924    }
925}
926
927#[derive(Debug)]
928pub(crate) struct EvalOrderBySortCondition {
929    pub(crate) expr: Box<dyn EvalExpr>,
930    pub(crate) spec: EvalOrderBySortSpec,
931}
932
933#[derive(Debug)]
934pub(crate) enum EvalOrderBySortSpec {
935    AscNullsFirst,
936    AscNullsLast,
937    DescNullsFirst,
938    DescNullsLast,
939}
940
941/// Represents an evaluation `Order By` operator; e.g. `ORDER BY a DESC NULLS LAST` in `SELECT a FROM t ORDER BY a DESC NULLS LAST`.
942#[derive(Debug)]
943pub(crate) struct EvalOrderBy {
944    pub(crate) cmp: Vec<EvalOrderBySortCondition>,
945}
946
947impl EvalOrderBy {
948    #[inline]
949    fn compare(&self, l: &Value, r: &Value, ctx: &dyn EvalContext) -> Ordering {
950        let l = l.as_datum_tuple_ref();
951        let r = r.as_datum_tuple_ref();
952        self.cmp
953            .iter()
954            .map(|spec| {
955                let l = spec.expr.evaluate(&l, ctx);
956                let r = spec.expr.evaluate(&r, ctx);
957
958                match spec.spec {
959                    EvalOrderBySortSpec::AscNullsFirst => {
960                        let wrap = NullSortedValue::<true, Value>;
961                        let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
962                        l.cmp(&r)
963                    }
964                    EvalOrderBySortSpec::AscNullsLast => {
965                        let wrap = NullSortedValue::<false, Value>;
966                        let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
967                        l.cmp(&r)
968                    }
969                    EvalOrderBySortSpec::DescNullsFirst => {
970                        let wrap = NullSortedValue::<false, Value>;
971                        let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
972                        r.cmp(&l)
973                    }
974                    EvalOrderBySortSpec::DescNullsLast => {
975                        let wrap = NullSortedValue::<true, Value>;
976                        let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
977                        r.cmp(&l)
978                    }
979                }
980            })
981            .find_or_last(|o| o != &Ordering::Equal)
982            .unwrap_or(Ordering::Equal)
983    }
984}
985
986impl Evaluable for EvalOrderBy {
987    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
988        let input_value = take_input!(inputs[0].take(), ctx);
989
990        let values: DatumLowerResult<Vec<_>> =
991            input_value.into_iter().map(|v| v.into_lower()).collect();
992        // TODO handle lowering error
993        let mut values = values.expect("lower");
994        values.sort_by(|l, r| self.compare(l, r, ctx));
995        Value::from(List::from(values))
996    }
997}
998
999/// Represents an evaluation `LIMIT` and/or `OFFSET` operator.
1000#[derive(Debug)]
1001pub(crate) struct EvalLimitOffset {
1002    pub(crate) limit: Option<Box<dyn EvalExpr>>,
1003    pub(crate) offset: Option<Box<dyn EvalExpr>>,
1004}
1005
1006impl Evaluable for EvalLimitOffset {
1007    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1008        let input_value = take_input!(inputs[0].take(), ctx);
1009
1010        let empty_bindings = DatumTupleRef::Empty;
1011
1012        let offset = match &self.offset {
1013            None => 0,
1014            Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
1015                Value::Integer(i) => {
1016                    if *i >= 0 {
1017                        *i as usize
1018                    } else {
1019                        0
1020                    }
1021                }
1022                _ => 0,
1023            },
1024        };
1025
1026        let limit = match &self.limit {
1027            None => None,
1028            Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
1029                Value::Integer(i) => {
1030                    if *i >= 0 {
1031                        Some(*i as usize)
1032                    } else {
1033                        None
1034                    }
1035                }
1036                _ => None,
1037            },
1038        };
1039
1040        let ordered = input_value.is_ordered();
1041        fn collect(values: impl Iterator<Item = Value>, ordered: bool) -> Value {
1042            match ordered {
1043                true => Value::from(values.collect::<List>()),
1044                false => Value::from(values.collect::<Bag>()),
1045            }
1046        }
1047
1048        let offsetted = input_value.into_iter().skip(offset);
1049        match limit {
1050            Some(n) => collect(offsetted.take(n), ordered),
1051            None => collect(offsetted, ordered),
1052        }
1053    }
1054}
1055
1056/// Represents an evaluation `SelectValue` operator; `SelectValue` implements `PartiQL` Core's
1057/// `SELECT VALUE` clause semantics. For `SelectValue` operational semantics, see section `6.1` of
1058/// [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
1059#[derive(Debug)]
1060pub(crate) struct EvalSelectValue {
1061    pub(crate) expr: Box<dyn EvalExpr>,
1062}
1063
1064impl EvalSelectValue {
1065    pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
1066        EvalSelectValue { expr }
1067    }
1068}
1069
1070impl Evaluable for EvalSelectValue {
1071    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1072        let input_value = take_input!(inputs[0].take(), ctx);
1073
1074        let ordered = input_value.is_ordered();
1075
1076        let values = input_value.into_iter().map(|v| {
1077            let v_as_tuple = v.as_datum_tuple_ref();
1078            self.expr.evaluate(&v_as_tuple, ctx).into_owned()
1079        });
1080
1081        match ordered {
1082            true => Value::from(values.collect::<List>()),
1083            false => Value::from(values.collect::<Bag>()),
1084        }
1085    }
1086}
1087
1088/// Represents an evaluation `Project` operator; for a given bag of input binding tuples as input
1089/// the `Project` selects attributes as specified by expressions in `exprs`. For `Project`
1090/// operational semantics, see section `6` of
1091/// [PartiQL Specification — August 1, 2019](https://partiql.org/assets/PartiQL-Specification.pdf).
1092pub(crate) struct EvalSelect {
1093    pub(crate) exprs: Vec<(String, Box<dyn EvalExpr>)>,
1094}
1095
1096impl EvalSelect {
1097    pub(crate) fn new(exprs: Vec<(String, Box<dyn EvalExpr>)>) -> Self {
1098        EvalSelect { exprs }
1099    }
1100}
1101
1102impl Debug for EvalSelect {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104        write!(f, "SELECT ")?;
1105        let mut sep = "";
1106        for (alias, expr) in &self.exprs {
1107            write!(f, "{sep}")?;
1108            expr.fmt(f)?;
1109            write!(f, " AS {alias}")?;
1110            sep = ", ";
1111        }
1112
1113        Ok(())
1114    }
1115}
1116
1117impl Evaluable for EvalSelect {
1118    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1119        let input_value = take_input!(inputs[0].take(), ctx);
1120
1121        let ordered = input_value.is_ordered();
1122
1123        let values = input_value.into_iter().map(|v| {
1124            let v_as_tuple = v.as_datum_tuple_ref();
1125
1126            let tuple_pairs = self.exprs.iter().filter_map(|(alias, expr)| {
1127                let evaluated_val = expr.evaluate(&v_as_tuple, ctx);
1128                match evaluated_val.as_ref() {
1129                    Missing => None,
1130                    _ => Some((alias.as_str(), evaluated_val.into_owned())),
1131                }
1132            });
1133
1134            tuple_pairs.collect::<Tuple>()
1135        });
1136
1137        match ordered {
1138            true => Value::from(values.collect::<List>()),
1139            false => Value::from(values.collect::<Bag>()),
1140        }
1141    }
1142}
1143
1144/// Represents an evaluation `ProjectAll` operator; `ProjectAll` implements SQL's `SELECT *`
1145/// semantics.
1146#[derive(Debug, Default)]
1147pub(crate) struct EvalSelectAll {
1148    pub(crate) passthrough: bool,
1149}
1150
1151impl EvalSelectAll {
1152    pub(crate) fn new(passthrough: bool) -> Self {
1153        Self { passthrough }
1154    }
1155}
1156
1157impl Evaluable for EvalSelectAll {
1158    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1159        let input_value = take_input!(inputs[0].take(), ctx);
1160
1161        let ordered = input_value.is_ordered();
1162
1163        let values = input_value.into_iter().map(|val| {
1164            if self.passthrough {
1165                val.coerce_into_tuple()
1166            } else {
1167                val.coerce_into_tuple()
1168                    .into_values()
1169                    .flat_map(|v| v.coerce_into_tuple().into_pairs())
1170                    .collect::<Tuple>()
1171            }
1172        });
1173
1174        match ordered {
1175            true => Value::from(values.collect::<List>()),
1176            false => Value::from(values.collect::<Bag>()),
1177        }
1178    }
1179}
1180
1181/// Represents an evaluation `ExprQuery` operator; in `PartiQL` as opposed to SQL, the following
1182/// expression by its own is valid: `2 * 2`. Considering this, evaluation plan designates an operator
1183/// for evaluating such stand-alone expressions.
1184#[derive(Debug)]
1185pub(crate) struct EvalExprQuery {
1186    pub(crate) expr: Box<dyn EvalExpr>,
1187}
1188
1189impl EvalExprQuery {
1190    pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
1191        EvalExprQuery { expr }
1192    }
1193}
1194
1195impl Evaluable for EvalExprQuery {
1196    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1197        let input = inputs[0].take().unwrap_or(Value::Null);
1198        let input = input.as_tuple_ref();
1199        let input = input.as_ref();
1200        let input = DatumTupleRef::Tuple(input);
1201        self.expr.evaluate(&input, ctx).into_owned()
1202    }
1203}
1204
1205/// Represents an SQL `DISTINCT` operator, e.g. in `SELECT DISTINCT a FROM t`.
1206#[derive(Debug, Default)]
1207pub(crate) struct EvalDistinct {}
1208
1209impl EvalDistinct {
1210    pub(crate) fn new() -> Self {
1211        Self::default()
1212    }
1213}
1214
1215impl Evaluable for EvalDistinct {
1216    fn evaluate(&self, mut inputs: [Option<Value>; 2], ctx: &dyn EvalContext) -> Value {
1217        let input_value = take_input!(inputs[0].take(), ctx);
1218        let ordered = input_value.is_ordered();
1219
1220        let values = input_value.into_iter().unique();
1221        match ordered {
1222            true => Value::from(values.collect::<List>()),
1223            false => Value::from(values.collect::<Bag>()),
1224        }
1225    }
1226}
1227
1228/// Represents an operator that captures the output of a (sub)query in the plan.
1229pub(crate) struct EvalSink {}
1230
1231impl Evaluable for EvalSink {
1232    fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1233        inputs[0].take().unwrap_or(Missing)
1234    }
1235}
1236
1237impl Debug for EvalSink {
1238    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1239        write!(f, "SINK")
1240    }
1241}
1242
1243/// Represents an evaluation operator for sub-queries, e.g. `SELECT a FROM b` in
1244/// `SELECT b.c, (SELECT a FROM b) FROM books AS b`.
1245#[derive(Debug)]
1246pub(crate) struct EvalSubQueryExpr {
1247    pub(crate) plan: Rc<RefCell<EvalPlan>>,
1248}
1249
1250impl EvalSubQueryExpr {
1251    pub(crate) fn new(plan: EvalPlan) -> Self {
1252        EvalSubQueryExpr {
1253            plan: Rc::new(RefCell::new(plan)),
1254        }
1255    }
1256}
1257
1258impl EvalExpr for EvalSubQueryExpr {
1259    fn evaluate<'a, 'c, 'o>(
1260        &'a self,
1261        bindings: &'a dyn RefTupleView<'a, Value>,
1262        ctx: &'c dyn EvalContext,
1263    ) -> Cow<'o, Value>
1264    where
1265        'c: 'a,
1266        'a: 'o,
1267    {
1268        let value = {
1269            let bindings = MapBindings::from(bindings);
1270            let nested_ctx = NestedContext::new(bindings, ctx);
1271
1272            let plan = RefCell::borrow(&self.plan);
1273
1274            let value = match plan.execute(&nested_ctx) {
1275                Ok(evaluated) => evaluated.result,
1276                Err(err) => {
1277                    for e in err.errors {
1278                        ctx.add_error(e);
1279                    }
1280                    Missing
1281                }
1282            };
1283
1284            value.clone()
1285        };
1286        Cow::Owned(value)
1287    }
1288}
1289
1290///
1291/// Coercion function F for bag operators described in RFC-0007
1292/// - `F(absent_value`) -> << >>
1293/// - `F(scalar_value`) -> << `scalar_value` >> # singleton bag
1294/// - `F(tuple_value`)  -> << `tuple_value` >>  # singleton bag, see future extensions
1295/// - `F(array_value`)  -> `bag_value`          # discard ordering
1296/// - `F(bag_value`)    -> `bag_value`          # identity
1297///
1298#[inline]
1299fn bagop_iter(v: Value) -> ValueIntoIterator {
1300    match v {
1301        Value::Null | Value::Missing => ValueIntoIterator::Single(None),
1302        other => other.into_iter(),
1303    }
1304}
1305
1306/// Represents the `OUTER UNION` bag operator.
1307#[derive(Debug, PartialEq)]
1308pub(crate) struct EvalOuterUnion {
1309    pub(crate) setq: SetQuantifier,
1310}
1311
1312impl EvalOuterUnion {
1313    pub(crate) fn new(setq: SetQuantifier) -> Self {
1314        EvalOuterUnion { setq }
1315    }
1316}
1317
1318impl Evaluable for EvalOuterUnion {
1319    fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1320        let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1321        let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1322        let chained = lhs.chain(rhs);
1323        let vals = match self.setq {
1324            SetQuantifier::All => chained.collect_vec(),
1325            SetQuantifier::Distinct => chained.unique().collect_vec(),
1326        };
1327        Value::from(Bag::from(vals))
1328    }
1329}
1330
1331/// Represents the `OUTER INTERSECT` bag operator.
1332#[derive(Debug, PartialEq)]
1333pub(crate) struct EvalOuterIntersect {
1334    pub(crate) setq: SetQuantifier,
1335}
1336
1337impl EvalOuterIntersect {
1338    pub(crate) fn new(setq: SetQuantifier) -> Self {
1339        EvalOuterIntersect { setq }
1340    }
1341}
1342
1343impl Evaluable for EvalOuterIntersect {
1344    fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1345        let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1346        let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1347
1348        let bag: Bag = match self.setq {
1349            SetQuantifier::All => {
1350                let mut lhs = lhs.counts();
1351                Bag::from_iter(rhs.filter(|elem| match lhs.get_mut(elem) {
1352                    Some(count) if *count > 0 => {
1353                        *count -= 1;
1354                        true
1355                    }
1356                    _ => false,
1357                }))
1358            }
1359            SetQuantifier::Distinct => {
1360                let lhs: HashSet<Value> = lhs.collect();
1361                Bag::from_iter(
1362                    rhs.filter(|elem| lhs.contains(elem))
1363                        .collect::<HashSet<_>>(),
1364                )
1365            }
1366        };
1367        Value::from(bag)
1368    }
1369}
1370
1371/// Represents the `OUTER EXCEPT` bag operator.
1372#[derive(Debug, PartialEq)]
1373pub(crate) struct EvalOuterExcept {
1374    pub(crate) setq: SetQuantifier,
1375}
1376
1377impl EvalOuterExcept {
1378    pub(crate) fn new(setq: SetQuantifier) -> Self {
1379        EvalOuterExcept { setq }
1380    }
1381}
1382
1383impl Evaluable for EvalOuterExcept {
1384    fn evaluate(&self, mut inputs: [Option<Value>; 2], _ctx: &dyn EvalContext) -> Value {
1385        let lhs = bagop_iter(inputs[0].take().unwrap_or(Missing));
1386        let rhs = bagop_iter(inputs[1].take().unwrap_or(Missing));
1387
1388        let mut exclude = rhs.counts();
1389        let excepted = lhs.filter(|elem| match exclude.get_mut(elem) {
1390            Some(count) if *count > 0 => {
1391                *count -= 1;
1392                false
1393            }
1394            _ => true,
1395        });
1396        let vals = match self.setq {
1397            SetQuantifier::All => excepted.collect_vec(),
1398            SetQuantifier::Distinct => excepted.unique().collect_vec(),
1399        };
1400        Value::from(Bag::from(vals))
1401    }
1402}
1403
1404/// Indicates if a set should be reduced to its distinct elements or not.
1405#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1406pub(crate) enum SetQuantifier {
1407    All,
1408    Distinct,
1409}