use crate::env::basic::MapBindings;
use crate::error::EvaluationError;
use crate::eval::expr::EvalExpr;
use crate::eval::{EvalContext, EvalPlan};
use itertools::Itertools;
use partiql_value::Value::{Boolean, Missing, Null};
use partiql_value::{partiql_bag, partiql_tuple, Bag, List, Tuple, Value};
use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
use std::cmp::{max, min, Ordering};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::rc::Rc;
#[macro_export]
macro_rules! take_input {
($expr:expr, $ctx:expr) => {
match $expr {
None => {
$ctx.add_error(EvaluationError::IllegalState(
"Error in retrieving input value".to_string(),
));
return Missing;
}
Some(val) => val,
}
};
}
pub enum EvalType {
SelfManaged,
GraphManaged,
}
pub trait Evaluable: Debug {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value;
fn update_input(&mut self, input: Value, branch_num: u8);
fn get_vars(&self) -> Option<&[String]> {
None
}
fn eval_type(&self) -> EvalType {
EvalType::GraphManaged
}
}
#[derive(Debug)]
pub(crate) struct EvalScan {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) as_key: String,
pub(crate) at_key: Option<String>,
pub(crate) input: Option<Value>,
attrs: Vec<String>,
}
impl EvalScan {
pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str) -> Self {
let attrs = vec![as_key.to_string()];
EvalScan {
expr,
as_key: as_key.to_string(),
at_key: None,
input: None,
attrs,
}
}
pub(crate) fn new_with_at_key(expr: Box<dyn EvalExpr>, as_key: &str, at_key: &str) -> Self {
let attrs = vec![as_key.to_string(), at_key.to_string()];
EvalScan {
expr,
as_key: as_key.to_string(),
at_key: Some(at_key.to_string()),
input: None,
attrs,
}
}
}
impl Evaluable for EvalScan {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = self.input.take().unwrap_or(Missing);
let bindings = match input_value {
Value::Bag(t) => *t,
Value::Tuple(t) => partiql_bag![*t],
_ => partiql_bag![partiql_tuple![]],
};
let mut value = partiql_bag![];
bindings.iter().for_each(|binding| {
let binding_tuple = binding.as_tuple_ref();
let v = self.expr.evaluate(&binding_tuple, ctx).into_owned();
let ordered = &v.is_ordered();
let mut at_index_counter: i64 = 0;
if let Some(at_key) = &self.at_key {
for t in v.into_iter() {
let mut out = Tuple::from([(self.as_key.as_str(), t)]);
let at_id = if *ordered {
at_index_counter.into()
} else {
Missing
};
out.insert(at_key, at_id);
value.push(Value::Tuple(Box::new(out)));
at_index_counter += 1;
}
} else {
for t in v.into_iter() {
let out = Tuple::from([(self.as_key.as_str(), t)]);
value.push(Value::Tuple(Box::new(out)));
}
}
});
Value::Bag(Box::new(value))
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
fn get_vars(&self) -> Option<&[String]> {
Some(&self.attrs)
}
}
#[derive(Debug)]
pub(crate) struct EvalJoin {
pub(crate) kind: EvalJoinKind,
pub(crate) on: Option<Box<dyn EvalExpr>>,
pub(crate) input: Option<Value>,
pub(crate) left: Box<dyn Evaluable>,
pub(crate) right: Box<dyn Evaluable>,
}
#[derive(Debug)]
pub(crate) enum EvalJoinKind {
Inner,
Left,
Right,
Full,
}
impl EvalJoin {
pub(crate) fn new(
kind: EvalJoinKind,
left: Box<dyn Evaluable>,
right: Box<dyn Evaluable>,
on: Option<Box<dyn EvalExpr>>,
) -> Self {
EvalJoin {
kind,
on,
input: None,
left,
right,
}
}
}
impl Evaluable for EvalJoin {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
#[inline]
fn tuple_with_null_vals<I, S>(attrs: I) -> Tuple
where
S: Into<String>,
I: IntoIterator<Item = S>,
{
attrs.into_iter().map(|k| (k.into(), Null)).collect()
}
let mut output_bag = partiql_bag![];
let input_env = self
.input
.take()
.unwrap_or_else(|| Value::from(partiql_tuple![]));
self.left.update_input(input_env.clone(), 0);
let lhs_values = self.left.evaluate(ctx);
let left_bindings = match lhs_values {
Value::Bag(t) => *t,
_ => {
ctx.add_error(EvaluationError::IllegalState(
"Left side of FROM source should result in a bag of bindings".to_string(),
));
return Missing;
}
};
match self.kind {
EvalJoinKind::Inner => {
left_bindings.iter().for_each(|b_l| {
let env_b_l = input_env
.as_tuple_ref()
.as_ref()
.tuple_concat(b_l.as_tuple_ref().borrow());
self.right.update_input(Value::from(env_b_l), 0);
let rhs_values = self.right.evaluate(ctx);
let right_bindings = match rhs_values {
Value::Bag(t) => *t,
_ => partiql_bag![partiql_tuple![]],
};
for b_r in right_bindings.iter() {
match &self.on {
None => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
output_bag.push(Value::from(b_l_b_r));
}
Some(condition) => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
let env_b_l_b_r =
&input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
let cond = condition.evaluate(env_b_l_b_r, ctx);
if cond.as_ref() == &Value::Boolean(true) {
output_bag.push(Value::Tuple(Box::new(b_l_b_r)));
}
}
}
}
});
}
EvalJoinKind::Left => {
left_bindings.iter().for_each(|b_l| {
let mut output_bag_left = partiql_bag![];
let env_b_l = input_env
.as_tuple_ref()
.as_ref()
.tuple_concat(b_l.as_tuple_ref().borrow());
self.right.update_input(Value::from(env_b_l), 0);
let rhs_values = self.right.evaluate(ctx);
let right_bindings = match rhs_values {
Value::Bag(t) => *t,
_ => partiql_bag![partiql_tuple![]],
};
for b_r in right_bindings.iter() {
match &self.on {
None => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
output_bag_left.push(Value::from(b_l_b_r));
}
Some(condition) => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
let env_b_l_b_r =
&input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
let cond = condition.evaluate(env_b_l_b_r, ctx);
if cond.as_ref() == &Value::Boolean(true) {
output_bag_left.push(Value::Tuple(Box::new(b_l_b_r)));
}
}
}
}
if output_bag_left.is_empty() {
let attrs = self.right.get_vars().unwrap_or(&[]);
let new_binding = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(&tuple_with_null_vals(attrs));
output_bag.push(Value::from(new_binding));
} else {
for elem in output_bag_left.into_iter() {
output_bag.push(elem)
}
}
});
}
EvalJoinKind::Full | EvalJoinKind::Right => {
ctx.add_error(EvaluationError::NotYetImplemented(
"FULL and RIGHT JOIN".to_string(),
));
return Missing;
}
};
Value::Bag(Box::new(output_bag))
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
fn eval_type(&self) -> EvalType {
EvalType::SelfManaged
}
}
#[derive(Debug)]
pub(crate) struct AggregateExpression {
pub(crate) name: String,
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) func: AggFunc,
}
pub trait AggregateFunction {
fn next_value(&mut self, input_value: &Value, group: &Tuple);
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError>;
}
#[derive(Debug)]
pub(crate) enum AggFunc {
Avg(Avg),
Count(Count),
Max(Max),
Min(Min),
Sum(Sum),
}
impl AggregateFunction for AggFunc {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
match self {
AggFunc::Avg(v) => v.next_value(input_value, group),
AggFunc::Count(v) => v.next_value(input_value, group),
AggFunc::Max(v) => v.next_value(input_value, group),
AggFunc::Min(v) => v.next_value(input_value, group),
AggFunc::Sum(v) => v.next_value(input_value, group),
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self {
AggFunc::Avg(v) => v.compute(group),
AggFunc::Count(v) => v.compute(group),
AggFunc::Max(v) => v.compute(group),
AggFunc::Min(v) => v.compute(group),
AggFunc::Sum(v) => v.compute(group),
}
}
}
#[derive(Debug, Default)]
pub(crate) enum AggFilterFn {
Distinct(AggFilterDistinct),
#[default]
All,
}
impl AggFilterFn {
fn filter_value(&mut self, input_value: Value, group: &Tuple) -> bool {
match self {
AggFilterFn::Distinct(d) => d.filter_value(input_value, group),
AggFilterFn::All => true,
}
}
}
#[derive(Debug)]
pub(crate) struct AggFilterDistinct {
seen_vals: HashMap<Tuple, HashSet<Value>>,
}
impl AggFilterDistinct {
pub(crate) fn new() -> Self {
AggFilterDistinct {
seen_vals: HashMap::new(),
}
}
}
impl Default for AggFilterDistinct {
fn default() -> Self {
Self::new()
}
}
impl AggFilterDistinct {
fn filter_value(&mut self, input_value: Value, group: &Tuple) -> bool {
if let Some(seen_vals_in_group) = self.seen_vals.get_mut(group) {
seen_vals_in_group.insert(input_value)
} else {
let mut new_seen_vals = HashSet::new();
new_seen_vals.insert(input_value);
self.seen_vals
.insert(group.clone(), new_seen_vals)
.is_none()
}
}
}
#[derive(Debug)]
pub(crate) struct Avg {
avgs: HashMap<Tuple, (usize, Value)>,
aggregator: AggFilterFn,
}
impl Avg {
pub(crate) fn new_distinct() -> Self {
Avg {
avgs: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}
pub(crate) fn new_all() -> Self {
Avg {
avgs: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}
impl AggregateFunction for Avg {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.avgs.get_mut(group) {
None => {
self.avgs.insert(group.clone(), (1, input_value.clone()));
}
Some((count, sum)) => {
*count += 1;
*sum = &sum.clone() + input_value;
}
}
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self.avgs.get(group) {
None => Err(EvaluationError::IllegalState(
"Expect group to exist in avgs".to_string(),
)),
Some((0, _)) => Ok(Null),
Some((c, s)) => Ok(s / &Value::Decimal(rust_decimal::Decimal::from(*c))),
}
}
}
#[derive(Debug)]
pub(crate) struct Count {
counts: HashMap<Tuple, usize>,
aggregator: AggFilterFn,
}
impl Count {
pub(crate) fn new_distinct() -> Self {
Count {
counts: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}
pub(crate) fn new_all() -> Self {
Count {
counts: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}
impl AggregateFunction for Count {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.counts.get_mut(group) {
None => {
self.counts.insert(group.clone(), 1);
}
Some(count) => {
*count += 1;
}
};
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self.counts.get(group) {
None => Err(EvaluationError::IllegalState(
"Expect group to exist in counts".to_string(),
)),
Some(val) => Ok(Value::from(val)),
}
}
}
#[derive(Debug)]
pub(crate) struct Max {
maxes: HashMap<Tuple, Value>,
aggregator: AggFilterFn,
}
impl Max {
pub(crate) fn new_distinct() -> Self {
Max {
maxes: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}
pub(crate) fn new_all() -> Self {
Max {
maxes: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}
impl AggregateFunction for Max {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.maxes.get_mut(group) {
None => {
self.maxes.insert(group.clone(), input_value.clone());
}
Some(m) => {
*m = max(m.clone(), input_value.clone());
}
}
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self.maxes.get(group) {
None => Err(EvaluationError::IllegalState(
"Expect group to exist in maxes".to_string(),
)),
Some(val) => Ok(val.clone()),
}
}
}
#[derive(Debug)]
pub(crate) struct Min {
mins: HashMap<Tuple, Value>,
aggregator: AggFilterFn,
}
impl Min {
pub(crate) fn new_distinct() -> Self {
Min {
mins: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}
pub(crate) fn new_all() -> Self {
Min {
mins: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}
impl AggregateFunction for Min {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.mins.get_mut(group) {
None => {
self.mins.insert(group.clone(), input_value.clone());
}
Some(m) => {
*m = min(m.clone(), input_value.clone());
}
}
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self.mins.get(group) {
None => Err(EvaluationError::IllegalState(
"Expect group to exist in mins".to_string(),
)),
Some(val) => Ok(val.clone()),
}
}
}
#[derive(Debug)]
pub(crate) struct Sum {
sums: HashMap<Tuple, Value>,
aggregator: AggFilterFn,
}
impl Sum {
pub(crate) fn new_distinct() -> Self {
Sum {
sums: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}
pub(crate) fn new_all() -> Self {
Sum {
sums: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}
impl AggregateFunction for Sum {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.sums.get_mut(group) {
None => {
self.sums.insert(group.clone(), input_value.clone());
}
Some(s) => {
*s = &s.clone() + input_value;
}
}
}
}
fn compute(&self, group: &Tuple) -> Result<Value, EvaluationError> {
match self.sums.get(group) {
None => Err(EvaluationError::IllegalState(
"Expect group to exist in sums".to_string(),
)),
Some(val) => Ok(val.clone()),
}
}
}
#[derive(Debug)]
pub(crate) struct EvalGroupBy {
pub(crate) strategy: EvalGroupingStrategy,
pub(crate) exprs: HashMap<String, Box<dyn EvalExpr>>,
pub(crate) aggregate_exprs: Vec<AggregateExpression>,
pub(crate) group_as_alias: Option<String>,
pub(crate) input: Option<Value>,
}
#[derive(Debug)]
pub(crate) enum EvalGroupingStrategy {
GroupFull,
GroupPartial,
}
impl EvalGroupBy {
#[inline]
fn eval_group(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> Tuple {
self.exprs
.iter()
.map(
|(alias, expr)| match expr.evaluate(bindings, ctx).into_owned() {
Missing => (alias.as_str(), Value::Null),
val => (alias.as_str(), val),
},
)
.collect::<Tuple>()
}
}
impl Evaluable for EvalGroupBy {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let group_as_alias = &self.group_as_alias;
let input_value = take_input!(self.input.take(), ctx);
match self.strategy {
EvalGroupingStrategy::GroupPartial => {
ctx.add_error(EvaluationError::NotYetImplemented(
"GROUP PARTIAL".to_string(),
));
Missing
}
EvalGroupingStrategy::GroupFull => {
let mut groups: HashMap<Tuple, Vec<Value>> = HashMap::new();
for v in input_value.into_iter() {
let v_as_tuple = v.coerce_to_tuple();
let group = self.eval_group(&v_as_tuple, ctx);
for aggregate_expr in self.aggregate_exprs.iter_mut() {
let evaluated_val =
aggregate_expr.expr.evaluate(&v_as_tuple, ctx).into_owned();
aggregate_expr.func.next_value(&evaluated_val, &group);
}
groups
.entry(group)
.or_insert(vec![])
.push(Value::Tuple(Box::new(v_as_tuple.clone())));
}
let bag = groups
.into_iter()
.map(|(mut k, v)| {
let mut agg_results: Vec<(&str, Value)> = vec![];
for aggregate_expr in &self.aggregate_exprs {
match aggregate_expr.func.compute(&k) {
Ok(agg_result) => {
agg_results.push((aggregate_expr.name.as_str(), agg_result))
}
Err(err) => {
ctx.add_error(err);
return Missing;
}
}
}
agg_results
.into_iter()
.for_each(|(agg_name, agg_result)| k.insert(agg_name, agg_result));
match group_as_alias {
None => Value::from(k),
Some(alias) => {
let mut tuple_with_group = k;
tuple_with_group.insert(alias, Value::Bag(Box::new(Bag::from(v))));
Value::from(tuple_with_group)
}
}
})
.collect::<Bag>();
Value::from(bag)
}
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalPivot {
pub(crate) input: Option<Value>,
pub(crate) key: Box<dyn EvalExpr>,
pub(crate) value: Box<dyn EvalExpr>,
}
impl EvalPivot {
pub(crate) fn new(key: Box<dyn EvalExpr>, value: Box<dyn EvalExpr>) -> Self {
EvalPivot {
input: None,
key,
value,
}
}
}
impl Evaluable for EvalPivot {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let tuple: Tuple = input_value
.into_iter()
.filter_map(|binding| {
let binding = binding.coerce_to_tuple();
let key = self.key.evaluate(&binding, ctx);
if let Value::String(s) = key.as_ref() {
let value = self.value.evaluate(&binding, ctx);
Some((s.to_string(), value.into_owned()))
} else {
None
}
})
.collect();
Value::from(tuple)
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalUnpivot {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) as_key: String,
pub(crate) at_key: Option<String>,
pub(crate) input: Option<Value>,
attrs: Vec<String>,
}
impl EvalUnpivot {
pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str, at_key: Option<String>) -> Self {
let attrs = if let Some(at_key) = &at_key {
vec![as_key.to_string(), at_key.clone()]
} else {
vec![as_key.to_string()]
};
EvalUnpivot {
expr,
as_key: as_key.to_string(),
at_key,
input: None,
attrs,
}
}
}
impl Evaluable for EvalUnpivot {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let tuple = match self.expr.evaluate(&Tuple::new(), ctx).into_owned() {
Value::Tuple(tuple) => *tuple,
other => other.coerce_to_tuple(),
};
let as_key = self.as_key.as_str();
let pairs = tuple;
let unpivoted = if let Some(at_key) = &self.at_key {
pairs
.map(|(k, v)| Tuple::from([(as_key, v), (at_key.as_str(), k.into())]))
.collect::<Bag>()
} else {
pairs
.map(|(_, v)| Tuple::from([(as_key, v)]))
.collect::<Bag>()
};
Value::from(unpivoted)
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
fn get_vars(&self) -> Option<&[String]> {
Some(&self.attrs)
}
}
#[derive(Debug)]
pub(crate) struct EvalFilter {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalFilter {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalFilter { expr, input: None }
}
#[inline]
fn eval_filter(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> bool {
let result = self.expr.evaluate(bindings, ctx);
match result.as_ref() {
Boolean(bool_val) => *bool_val,
_ => false,
}
}
}
impl Evaluable for EvalFilter {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let filtered = input_value
.into_iter()
.map(Value::coerce_to_tuple)
.filter_map(|v| self.eval_filter(&v, ctx).then_some(v));
Value::from(filtered.collect::<Bag>())
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalHaving {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalHaving {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalHaving { expr, input: None }
}
#[inline]
fn eval_having(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> bool {
let result = self.expr.evaluate(bindings, ctx);
match result.as_ref() {
Boolean(bool_val) => *bool_val,
_ => false,
}
}
}
impl Evaluable for EvalHaving {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let filtered = input_value
.into_iter()
.map(Value::coerce_to_tuple)
.filter_map(|v| self.eval_having(&v, ctx).then_some(v));
Value::from(filtered.collect::<Bag>())
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalOrderBySortCondition {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) spec: EvalOrderBySortSpec,
}
#[derive(Debug)]
pub(crate) enum EvalOrderBySortSpec {
AscNullsFirst,
AscNullsLast,
DescNullsFirst,
DescNullsLast,
}
#[derive(Debug)]
pub(crate) struct EvalOrderBy {
pub(crate) cmp: Vec<EvalOrderBySortCondition>,
pub(crate) input: Option<Value>,
}
impl EvalOrderBy {
#[inline]
fn compare(&self, l: &Value, r: &Value, ctx: &dyn EvalContext) -> Ordering {
let l = l.as_tuple_ref();
let r = r.as_tuple_ref();
self.cmp
.iter()
.map(|spec| {
let l = spec.expr.evaluate(&l, ctx);
let r = spec.expr.evaluate(&r, ctx);
match spec.spec {
EvalOrderBySortSpec::AscNullsFirst => l.as_ref().cmp(r.as_ref()),
EvalOrderBySortSpec::AscNullsLast => match (l.as_ref(), r.as_ref()) {
(Null, Null) => Ordering::Equal,
(Null, Missing) => Ordering::Less,
(Missing, Missing) => Ordering::Equal,
(Missing, Null) => Ordering::Greater,
(Null, _) => Ordering::Greater,
(Missing, _) => Ordering::Greater,
(_, Null) => Ordering::Less,
(_, Missing) => Ordering::Less,
(l, r) => l.cmp(r),
},
EvalOrderBySortSpec::DescNullsFirst => match (l.as_ref(), r.as_ref()) {
(Null, Null) => Ordering::Equal,
(Null, Missing) => Ordering::Less,
(Missing, Missing) => Ordering::Equal,
(Missing, Null) => Ordering::Greater,
(Null, _) => Ordering::Less,
(Missing, _) => Ordering::Less,
(_, Null) => Ordering::Greater,
(_, Missing) => Ordering::Greater,
(l, r) => r.cmp(l),
},
EvalOrderBySortSpec::DescNullsLast => r.as_ref().cmp(l.as_ref()),
}
})
.find_or_last(|o| o != &Ordering::Equal)
.unwrap_or(Ordering::Equal)
}
}
impl Evaluable for EvalOrderBy {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let mut values = input_value.into_iter().collect_vec();
values.sort_by(|l, r| self.compare(l, r, ctx));
Value::from(List::from(values))
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalLimitOffset {
pub(crate) limit: Option<Box<dyn EvalExpr>>,
pub(crate) offset: Option<Box<dyn EvalExpr>>,
pub(crate) input: Option<Value>,
}
impl Evaluable for EvalLimitOffset {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let empty_bindings = Tuple::new();
let offset = match &self.offset {
None => 0,
Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
Value::Integer(i) => {
if *i >= 0 {
*i as usize
} else {
0
}
}
_ => 0,
},
};
let limit = match &self.limit {
None => None,
Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
Value::Integer(i) => {
if *i >= 0 {
Some(*i as usize)
} else {
None
}
}
_ => None,
},
};
let ordered = input_value.is_ordered();
fn collect(values: impl Iterator<Item = Value>, ordered: bool) -> Value {
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
let offsetted = input_value.into_iter().skip(offset);
match limit {
Some(n) => collect(offsetted.take(n), ordered),
None => collect(offsetted, ordered),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalSelectValue {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalSelectValue {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalSelectValue { expr, input: None }
}
}
impl Evaluable for EvalSelectValue {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|v| {
let v_as_tuple = v.coerce_to_tuple();
self.expr.evaluate(&v_as_tuple, ctx).into_owned()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalSelect {
pub(crate) exprs: HashMap<String, Box<dyn EvalExpr>>,
pub(crate) input: Option<Value>,
}
impl EvalSelect {
pub(crate) fn new(exprs: HashMap<String, Box<dyn EvalExpr>>) -> Self {
EvalSelect { exprs, input: None }
}
}
impl Evaluable for EvalSelect {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|v| {
let v_as_tuple = v.coerce_to_tuple();
let tuple_pairs = self.exprs.iter().filter_map(|(alias, expr)| {
let evaluated_val = expr.evaluate(&v_as_tuple, ctx);
match evaluated_val.as_ref() {
Missing => None,
_ => Some((alias.as_str(), evaluated_val.into_owned())),
}
});
tuple_pairs.collect::<Tuple>()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug, Default)]
pub(crate) struct EvalSelectAll {
pub(crate) input: Option<Value>,
}
impl EvalSelectAll {
pub(crate) fn new() -> Self {
Self::default()
}
}
impl Evaluable for EvalSelectAll {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|val| {
val.coerce_to_tuple()
.into_values()
.flat_map(|v| v.coerce_to_tuple().into_pairs())
.collect::<Tuple>()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalExprQuery {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalExprQuery {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalExprQuery { expr, input: None }
}
}
impl Evaluable for EvalExprQuery {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = self.input.take().unwrap_or(Value::Null).coerce_to_tuple();
self.expr.evaluate(&input_value, ctx).into_owned()
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug, Default)]
pub(crate) struct EvalDistinct {
pub(crate) input: Option<Value>,
}
impl EvalDistinct {
pub(crate) fn new() -> Self {
Self::default()
}
}
impl Evaluable for EvalDistinct {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().unique();
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalSink {
pub(crate) input: Option<Value>,
}
impl Evaluable for EvalSink {
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Value {
self.input.take().unwrap_or_else(|| Missing)
}
fn update_input(&mut self, input: Value, _branch_num: u8) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalSubQueryExpr {
pub(crate) plan: Rc<RefCell<EvalPlan>>,
}
impl EvalSubQueryExpr {
pub(crate) fn new(plan: EvalPlan) -> Self {
EvalSubQueryExpr {
plan: Rc::new(RefCell::new(plan)),
}
}
}
impl EvalExpr for EvalSubQueryExpr {
fn evaluate<'a>(&'a self, bindings: &'a Tuple, _ctx: &'a dyn EvalContext) -> Cow<'a, Value> {
let value = if let Ok(evaluated) = self
.plan
.borrow_mut()
.execute_mut(MapBindings::from(bindings))
{
evaluated.result
} else {
Missing
};
Cow::Owned(value)
}
}
#[derive(Debug)]
pub(crate) enum SetQuantifier {
All,
Distinct,
}