use crate::expressions::{
BinaryExpression, BinaryOperator, ColumnName, Expression as Expr, Scalar, UnaryExpression,
UnaryOperator, VariadicExpression, VariadicOperator,
};
use crate::schema::DataType;
use std::cmp::Ordering;
use tracing::debug;
pub(crate) mod parquet_stats_skipping;
#[cfg(test)]
mod tests;
pub(crate) trait PredicateEvaluator {
type Output;
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output>;
fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output>;
fn eval_le(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output>;
fn eval_gt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output>;
fn eval_ge(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output>;
fn eval_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_binary_scalars(
&self,
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn eval_binary_columns(
&self,
op: BinaryOperator,
a: &ColumnName,
b: &ColumnName,
inverted: bool,
) -> Option<Self::Output>;
fn finish_eval_variadic(
&self,
op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output>;
fn eval_column(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output> {
self.eval_eq(col, &Scalar::from(inverted), true)
}
fn eval_unary(&self, op: UnaryOperator, expr: &Expr, inverted: bool) -> Option<Self::Output> {
match op {
UnaryOperator::Not => self.eval_expr(expr, !inverted),
UnaryOperator::IsNull => {
let Expr::Column(col) = expr else {
debug!("Unsupported operand: IS [NOT] NULL: {expr:?}");
return None;
};
self.eval_is_null(col, inverted)
}
}
}
fn eval_distinct(
&self,
col: &ColumnName,
val: &Scalar,
inverted: bool,
) -> Option<Self::Output> {
if let Scalar::Null(_) = val {
self.eval_is_null(col, !inverted)
} else {
let args = [
self.eval_is_null(col, inverted),
self.eval_eq(col, val, !inverted),
];
self.finish_eval_variadic(VariadicOperator::Or, args, inverted)
}
}
fn eval_in(&self, _col: &ColumnName, _val: &Scalar, _inverted: bool) -> Option<Self::Output> {
None }
fn eval_binary(
&self,
op: BinaryOperator,
left: &Expr,
right: &Expr,
inverted: bool,
) -> Option<Self::Output> {
use BinaryOperator::*;
use Expr::{Column, Literal};
let (op, col, val) = match (left, right) {
(Column(a), Column(b)) => return self.eval_binary_columns(op, a, b, inverted),
(Literal(a), Literal(b)) => return self.eval_binary_scalars(op, a, b, inverted),
(Literal(val), Column(col)) => (op.commute()?, col, val),
(Column(col), Literal(val)) => (op, col, val),
_ => {
debug!("Unsupported binary operand(s): {left:?} {op:?} {right:?}");
return None;
}
};
match (op, inverted) {
(Plus | Minus | Multiply | Divide, _) => None, (LessThan, false) | (GreaterThanOrEqual, true) => self.eval_lt(col, val),
(LessThanOrEqual, false) | (GreaterThan, true) => self.eval_le(col, val),
(GreaterThan, false) | (LessThanOrEqual, true) => self.eval_gt(col, val),
(GreaterThanOrEqual, false) | (LessThan, true) => self.eval_ge(col, val),
(Equal, _) => self.eval_eq(col, val, inverted),
(NotEqual, _) => self.eval_eq(col, val, !inverted),
(Distinct, _) => self.eval_distinct(col, val, inverted),
(In, _) => self.eval_in(col, val, inverted),
(NotIn, _) => self.eval_in(col, val, !inverted),
}
}
fn eval_variadic(
&self,
op: VariadicOperator,
exprs: &[Expr],
inverted: bool,
) -> Option<Self::Output> {
let exprs = exprs.iter().map(|expr| self.eval_expr(expr, inverted));
self.finish_eval_variadic(op, exprs, inverted)
}
fn eval_expr(&self, expr: &Expr, inverted: bool) -> Option<Self::Output> {
use Expr::*;
match expr {
Literal(val) => self.eval_scalar(val, inverted),
Column(col) => self.eval_column(col, inverted),
Struct(_) => None, Unary(UnaryExpression { op, expr }) => self.eval_unary(*op, expr, inverted),
Binary(BinaryExpression { op, left, right }) => {
self.eval_binary(*op, left, right, inverted)
}
Variadic(VariadicExpression { op, exprs }) => self.eval_variadic(*op, exprs, inverted),
}
}
}
pub(crate) struct PredicateEvaluatorDefaults;
impl PredicateEvaluatorDefaults {
pub(crate) fn eval_scalar(val: &Scalar, inverted: bool) -> Option<bool> {
match val {
Scalar::Boolean(val) => Some(*val != inverted),
_ => None,
}
}
pub(crate) fn partial_cmp_scalars(
ord: Ordering,
a: &Scalar,
b: &Scalar,
inverted: bool,
) -> Option<bool> {
let cmp = a.partial_cmp(b)?;
let matched = cmp == ord;
Some(matched != inverted)
}
pub(crate) fn eval_binary_scalars(
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<bool> {
use BinaryOperator::*;
match op {
Equal => Self::partial_cmp_scalars(Ordering::Equal, left, right, inverted),
NotEqual => Self::partial_cmp_scalars(Ordering::Equal, left, right, !inverted),
LessThan => Self::partial_cmp_scalars(Ordering::Less, left, right, inverted),
LessThanOrEqual => Self::partial_cmp_scalars(Ordering::Greater, left, right, !inverted),
GreaterThan => Self::partial_cmp_scalars(Ordering::Greater, left, right, inverted),
GreaterThanOrEqual => Self::partial_cmp_scalars(Ordering::Less, left, right, !inverted),
_ => {
debug!("Unsupported binary operator: {left:?} {op:?} {right:?}");
None
}
}
}
pub(crate) fn finish_eval_variadic(
op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<bool>>,
inverted: bool,
) -> Option<bool> {
let dominator = match op {
VariadicOperator::And => inverted,
VariadicOperator::Or => !inverted,
};
let result = exprs.into_iter().try_fold(false, |found_null, val| {
match val {
Some(val) if val == dominator => None, Some(_) => Some(found_null),
None => Some(true), }
});
match result {
None => Some(dominator), Some(false) => Some(!dominator),
Some(true) => None, }
}
}
pub(crate) trait ResolveColumnAsScalar {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar>;
}
#[cfg(test)]
pub(crate) struct UnimplementedColumnResolver;
#[cfg(test)]
impl ResolveColumnAsScalar for UnimplementedColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
unimplemented!()
}
}
#[cfg(test)]
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar> {
self.get(col).cloned()
}
}
pub(crate) struct DefaultPredicateEvaluator<R: ResolveColumnAsScalar> {
resolver: R,
}
impl<R: ResolveColumnAsScalar> DefaultPredicateEvaluator<R> {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar> {
self.resolver.resolve_column(col)
}
}
impl<R: ResolveColumnAsScalar + 'static> From<R> for DefaultPredicateEvaluator<R> {
fn from(resolver: R) -> Self {
Self { resolver }
}
}
impl<R: ResolveColumnAsScalar> PredicateEvaluator for DefaultPredicateEvaluator<R> {
type Output = bool;
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<bool> {
PredicateEvaluatorDefaults::eval_scalar(val, inverted)
}
fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
Some(matches!(col, Scalar::Null(_)) != inverted)
}
fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_binary_scalars(BinaryOperator::LessThan, &col, val, false)
}
fn eval_le(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_binary_scalars(BinaryOperator::LessThanOrEqual, &col, val, false)
}
fn eval_gt(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_binary_scalars(BinaryOperator::GreaterThan, &col, val, false)
}
fn eval_ge(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_binary_scalars(BinaryOperator::GreaterThanOrEqual, &col, val, false)
}
fn eval_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_binary_scalars(BinaryOperator::Equal, &col, val, inverted)
}
fn eval_binary_scalars(
&self,
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output> {
PredicateEvaluatorDefaults::eval_binary_scalars(op, left, right, inverted)
}
fn eval_binary_columns(
&self,
op: BinaryOperator,
left: &ColumnName,
right: &ColumnName,
inverted: bool,
) -> Option<Self::Output> {
let left = self.resolve_column(left)?;
let right = self.resolve_column(right)?;
self.eval_binary_scalars(op, &left, &right, inverted)
}
fn finish_eval_variadic(
&self,
op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<bool>>,
inverted: bool,
) -> Option<bool> {
PredicateEvaluatorDefaults::finish_eval_variadic(op, exprs, inverted)
}
}
pub(crate) trait DataSkippingPredicateEvaluator {
type Output;
type TypedStat;
type IntStat;
fn get_min_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Self::TypedStat>;
fn get_max_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Self::TypedStat>;
fn get_nullcount_stat(&self, col: &ColumnName) -> Option<Self::IntStat>;
fn get_rowcount_stat(&self) -> Option<Self::IntStat>;
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output>;
fn eval_binary_scalars(
&self,
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn finish_eval_variadic(
&self,
op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output>;
fn eval_partial_cmp(
&self,
ord: Ordering,
col: Self::TypedStat,
val: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn partial_cmp_min_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Self::Output> {
let min = self.get_min_stat(col, &val.data_type())?;
self.eval_partial_cmp(ord, min, val, inverted)
}
fn partial_cmp_max_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Self::Output> {
let max = self.get_max_stat(col, &val.data_type())?;
self.eval_partial_cmp(ord, max, val, inverted)
}
fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.partial_cmp_min_stat(col, val, Ordering::Less, false)
}
fn eval_le(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.partial_cmp_min_stat(col, val, Ordering::Greater, true)
}
fn eval_gt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.partial_cmp_max_stat(col, val, Ordering::Greater, false)
}
fn eval_ge(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.partial_cmp_max_stat(col, val, Ordering::Less, true)
}
fn eval_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
let (op, exprs) = if inverted {
let exprs = [
self.partial_cmp_min_stat(col, val, Ordering::Equal, true),
self.partial_cmp_max_stat(col, val, Ordering::Equal, true),
];
(VariadicOperator::Or, exprs)
} else {
let exprs = [
self.partial_cmp_min_stat(col, val, Ordering::Greater, true),
self.partial_cmp_max_stat(col, val, Ordering::Less, true),
];
(VariadicOperator::And, exprs)
};
self.finish_eval_variadic(op, exprs, false)
}
}
impl<T: DataSkippingPredicateEvaluator> PredicateEvaluator for T {
type Output = T::Output;
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_scalar(val, inverted)
}
fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output> {
self.eval_is_null(col, inverted)
}
fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.eval_lt(col, val)
}
fn eval_le(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.eval_le(col, val)
}
fn eval_gt(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.eval_gt(col, val)
}
fn eval_ge(&self, col: &ColumnName, val: &Scalar) -> Option<Self::Output> {
self.eval_ge(col, val)
}
fn eval_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_eq(col, val, inverted)
}
fn eval_binary_scalars(
&self,
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output> {
self.eval_binary_scalars(op, left, right, inverted)
}
fn eval_binary_columns(
&self,
_op: BinaryOperator,
_a: &ColumnName,
_b: &ColumnName,
_inverted: bool,
) -> Option<Self::Output> {
None }
fn finish_eval_variadic(
&self,
op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output> {
self.finish_eval_variadic(op, exprs, inverted)
}
}