use crate::expressions::{
BinaryExpression, BinaryExpressionOp, BinaryPredicate, BinaryPredicateOp, ColumnName,
Expression as Expr, JunctionPredicate, JunctionPredicateOp, OpaqueExpression,
OpaqueExpressionOpRef, OpaquePredicate, OpaquePredicateOpRef, Predicate as Pred, Scalar,
UnaryPredicate, UnaryPredicateOp,
};
use crate::schema::DataType;
use std::cmp::Ordering;
use tracing::{debug, warn};
pub(crate) mod parquet_stats_skipping;
#[cfg(test)]
mod tests;
pub type DirectPredicateEvaluator<'a> = dyn KernelPredicateEvaluator<Output = bool> + 'a;
pub type DirectDataSkippingPredicateEvaluator<'a> =
dyn DataSkippingPredicateEvaluator<Output = bool, ColumnStat = Scalar> + 'a;
pub type IndirectDataSkippingPredicateEvaluator<'a> =
dyn DataSkippingPredicateEvaluator<Output = Pred, ColumnStat = Expr> + 'a;
pub trait KernelPredicateEvaluator {
type Output;
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output>;
fn eval_pred_lt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_gt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn eval_pred_binary_columns(
&self,
op: BinaryPredicateOp,
a: &ColumnName,
b: &ColumnName,
inverted: bool,
) -> Option<Self::Output>;
fn eval_pred_opaque(
&self,
op: &OpaquePredicateOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<Self::Output>;
fn eval_pred_expr_opaque(
&self,
op: &OpaqueExpressionOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<Self::Output>;
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output>;
fn eval_pred_column(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output> {
self.eval_pred_eq(col, &Scalar::from(inverted), true)
}
fn eval_pred_not(&self, pred: &Pred, inverted: bool) -> Option<Self::Output> {
self.eval_pred(pred, !inverted)
}
fn eval_pred_expr(&self, expr: &Expr, inverted: bool) -> Option<Self::Output> {
match expr {
Expr::Literal(val) => self.eval_pred_scalar(val, inverted),
Expr::Column(col) => self.eval_pred_column(col, inverted),
Expr::Predicate(pred) => self.eval_pred(pred, inverted),
Expr::Opaque(OpaqueExpression { op, exprs }) => {
self.eval_pred_expr_opaque(op, exprs, inverted)
}
Expr::Struct(..)
| Expr::Transform(_)
| Expr::Unary(_)
| Expr::Binary(_)
| Expr::Variadic(_)
| Expr::ParseJson(_)
| Expr::MapToStruct(_)
| Expr::Unknown(_) => None,
}
}
fn eval_pred_unary(
&self,
op: UnaryPredicateOp,
expr: &Expr,
inverted: bool,
) -> Option<Self::Output> {
match op {
UnaryPredicateOp::IsNull => match expr {
Expr::Literal(val) => self.eval_pred_scalar_is_null(val, inverted),
Expr::Column(col) => self.eval_pred_is_null(col, inverted),
Expr::Predicate(_)
| Expr::Struct(..)
| Expr::Transform(_)
| Expr::Unary(_)
| Expr::Binary(_)
| Expr::Variadic(_)
| Expr::Opaque(_)
| Expr::ParseJson { .. }
| Expr::MapToStruct(_)
| Expr::Unknown(_) => {
debug!("Unsupported operand: IS [NOT] NULL: {expr:?}");
None
}
},
}
}
fn eval_pred_distinct(
&self,
col: &ColumnName,
val: &Scalar,
inverted: bool,
) -> Option<Self::Output> {
if let Scalar::Null(_) = val {
self.eval_pred_is_null(col, !inverted)
} else {
let mut args = [
self.eval_pred_is_null(col, inverted),
self.eval_pred_eq(col, val, !inverted),
]
.into_iter();
self.finish_eval_pred_junction(JunctionPredicateOp::Or, &mut args, inverted)
}
}
fn eval_pred_in(
&self,
_col: &ColumnName,
_val: &Scalar,
_inverted: bool,
) -> Option<Self::Output> {
None }
fn eval_pred_binary(
&self,
op: BinaryPredicateOp,
left: &Expr,
right: &Expr,
inverted: bool,
) -> Option<Self::Output> {
use BinaryPredicateOp::*;
use Expr::{Column, Literal};
match (left, right) {
(Column(a), Column(b)) => self.eval_pred_binary_columns(op, a, b, inverted),
(Literal(a), Literal(b)) => self.eval_pred_binary_scalars(op, a, b, inverted),
(Column(col), Literal(val)) => match op {
LessThan => self.eval_pred_lt(col, val, inverted),
GreaterThan => self.eval_pred_gt(col, val, inverted),
Equal => self.eval_pred_eq(col, val, inverted),
Distinct => self.eval_pred_distinct(col, val, inverted),
In => self.eval_pred_in(col, val, inverted),
},
(Literal(val), Column(col)) => match op {
LessThan => self.eval_pred_gt(col, val, inverted),
GreaterThan => self.eval_pred_lt(col, val, inverted),
Equal => self.eval_pred_eq(col, val, inverted),
Distinct => self.eval_pred_distinct(col, val, inverted),
In => None, },
_ => {
debug!("Unsupported binary operand(s): {left:?} {op:?} {right:?}");
None
}
}
}
fn eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &[Pred],
inverted: bool,
) -> Option<Self::Output> {
let mut preds = preds.iter().map(|pred| self.eval_pred(pred, inverted));
self.finish_eval_pred_junction(op, &mut preds, inverted)
}
fn eval_pred(&self, pred: &Pred, inverted: bool) -> Option<Self::Output> {
use Pred::*;
match pred {
BooleanExpression(expr) => self.eval_pred_expr(expr, inverted),
Not(pred) => self.eval_pred_not(pred, inverted),
Unary(UnaryPredicate { op, expr }) => self.eval_pred_unary(*op, expr, inverted),
Binary(BinaryPredicate { op, left, right }) => {
self.eval_pred_binary(*op, left, right, inverted)
}
Junction(JunctionPredicate { op, preds }) => {
self.eval_pred_junction(*op, preds, inverted)
}
Opaque(OpaquePredicate { op, exprs }) => self.eval_pred_opaque(op, exprs, inverted),
Unknown(_) => None, }
}
fn eval_pred_sql_where(&self, pred: &Pred, inverted: bool) -> Option<Self::Output> {
use Pred::*;
match pred {
Junction(JunctionPredicate { op, preds }) => {
let mut preds = preds
.iter()
.map(|pred| self.eval_pred_sql_where(pred, inverted));
self.finish_eval_pred_junction(*op, &mut preds, inverted)
}
Binary(BinaryPredicate { op, left, right }) if op.is_null_intolerant() => {
let mut preds = [
self.eval_pred_unary(UnaryPredicateOp::IsNull, left, true),
self.eval_pred_unary(UnaryPredicateOp::IsNull, right, true),
self.eval_pred_binary(*op, left, right, inverted),
]
.into_iter();
self.finish_eval_pred_junction(JunctionPredicateOp::And, &mut preds, false)
}
Not(pred) => self.eval_pred_sql_where(pred, !inverted),
BooleanExpression(Expr::Column(col)) => {
let mut preds = [
self.eval_pred_is_null(col, true),
self.eval_pred_column(col, inverted),
]
.into_iter();
self.finish_eval_pred_junction(JunctionPredicateOp::And, &mut preds, false)
}
BooleanExpression(Expr::Predicate(pred)) => self.eval_pred_sql_where(pred, inverted),
_ => self.eval_pred(pred, inverted),
}
}
#[allow(unused)]
fn eval(&self, pred: &Pred) -> Option<Self::Output> {
self.eval_pred(pred, false)
}
fn eval_sql_where(&self, pred: &Pred) -> Option<Self::Output> {
self.eval_pred_sql_where(pred, false)
}
}
pub struct KernelPredicateEvaluatorDefaults;
impl KernelPredicateEvaluatorDefaults {
pub fn eval_pred_scalar(val: &Scalar, inverted: bool) -> Option<bool> {
match val {
Scalar::Boolean(val) => Some(*val != inverted),
_ => None,
}
}
pub fn eval_pred_scalar_is_null(val: &Scalar, inverted: bool) -> Option<bool> {
Some(val.is_null() != inverted)
}
pub fn partial_cmp_scalars(
ord: Ordering,
a: &Scalar,
b: &Scalar,
inverted: bool,
) -> Option<bool> {
let cmp = a.logical_partial_cmp(b)?;
let matched = cmp == ord;
Some(matched != inverted)
}
pub fn eval_pred_binary_scalars(
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<bool> {
use BinaryPredicateOp::*;
match op {
Equal => Self::partial_cmp_scalars(Ordering::Equal, left, right, inverted),
LessThan => Self::partial_cmp_scalars(Ordering::Less, left, right, inverted),
GreaterThan => Self::partial_cmp_scalars(Ordering::Greater, left, right, inverted),
Distinct | In => {
debug!("Unsupported binary operator: {left:?} {op:?} {right:?}");
None
}
}
}
pub fn finish_eval_pred_junction(
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<bool>>,
inverted: bool,
) -> Option<bool> {
let dominator = match op {
JunctionPredicateOp::And => inverted,
JunctionPredicateOp::Or => !inverted,
};
let mut found_null = false;
for val in preds {
match val {
Some(val) if val == dominator => return Some(dominator), None => found_null = true,
Some(_) => (), }
}
(!found_null).then_some(!dominator)
}
}
pub(crate) trait ResolveColumnAsScalar {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar>;
}
#[cfg(test)]
pub(crate) struct UnimplementedColumnResolver;
#[cfg(test)]
impl ResolveColumnAsScalar for UnimplementedColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
unimplemented!()
}
}
pub(crate) struct EmptyColumnResolver;
impl ResolveColumnAsScalar for EmptyColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
None
}
}
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar> {
self.get(col).cloned()
}
}
pub(crate) struct DefaultKernelPredicateEvaluator<R: ResolveColumnAsScalar> {
resolver: R,
}
impl<R: ResolveColumnAsScalar> DefaultKernelPredicateEvaluator<R> {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar> {
self.resolver.resolve_column(col)
}
pub(crate) fn eval_expr(&self, expr: &Expr) -> Option<Scalar> {
match expr {
Expr::Literal(value) => Some(value.clone()),
Expr::Column(name) => self.resolve_column(name),
Expr::Predicate(pred) => self.eval_pred(pred, false).map(Scalar::from),
Expr::Struct(..) | Expr::Transform(_) | Expr::Unary(_) => None, Expr::Binary(BinaryExpression { op, left, right }) => {
let op_fn = match op {
BinaryExpressionOp::Plus => Scalar::try_add,
BinaryExpressionOp::Minus => Scalar::try_sub,
BinaryExpressionOp::Multiply => Scalar::try_mul,
BinaryExpressionOp::Divide => Scalar::try_div,
};
op_fn(&self.eval_expr(left)?, &self.eval_expr(right)?)
}
Expr::Variadic(_) => None, Expr::Opaque(OpaqueExpression { op, exprs }) => op
.eval_expr_scalar(&|expr| self.eval_expr(expr), exprs)
.inspect_err(|err| {
warn!("Failed to evaluate {:?}: {err:?}", op.as_ref());
})
.ok(),
Expr::ParseJson(_) | Expr::MapToStruct(_) => None,
Expr::Unknown(_) => None,
}
}
}
impl<R: ResolveColumnAsScalar + 'static> From<R> for DefaultKernelPredicateEvaluator<R> {
fn from(resolver: R) -> Self {
Self { resolver }
}
}
impl<R: ResolveColumnAsScalar> KernelPredicateEvaluator for DefaultKernelPredicateEvaluator<R> {
type Output = bool;
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<bool> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar(val, inverted)
}
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<bool> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar_is_null(val, inverted)
}
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_pred_scalar_is_null(&col, inverted)
}
fn eval_pred_lt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_pred_binary_scalars(BinaryPredicateOp::LessThan, &col, val, inverted)
}
fn eval_pred_gt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_pred_binary_scalars(BinaryPredicateOp::GreaterThan, &col, val, inverted)
}
fn eval_pred_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
self.eval_pred_binary_scalars(BinaryPredicateOp::Equal, &col, val, inverted)
}
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<bool> {
KernelPredicateEvaluatorDefaults::eval_pred_binary_scalars(op, left, right, inverted)
}
fn eval_pred_binary_columns(
&self,
op: BinaryPredicateOp,
left: &ColumnName,
right: &ColumnName,
inverted: bool,
) -> Option<bool> {
let left = self.resolve_column(left)?;
let right = self.resolve_column(right)?;
self.eval_pred_binary_scalars(op, &left, &right, inverted)
}
fn eval_pred_opaque(
&self,
op: &OpaquePredicateOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<bool> {
op.eval_pred_scalar(&|expr| self.eval_expr(expr), self, exprs, inverted)
.unwrap_or_else(|err| {
warn!("Unable to evaluate {:?}: {err:?}", op.as_ref());
None
})
}
fn eval_pred_expr_opaque(
&self,
op: &OpaqueExpressionOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<bool> {
match op.eval_expr_scalar(&|expr| self.eval_expr(expr), exprs) {
Ok(Scalar::Boolean(val)) => Some(val != inverted),
Ok(Scalar::Null(DataType::BOOLEAN)) => None,
Ok(other) => {
warn!(
"Expected {:?} to produce a boolean value, but got {:?}",
op.as_ref(),
other.data_type()
);
None
}
Err(err) => {
warn!("Unable to evaluate {:?}: {err:?}", op.as_ref());
None
}
}
}
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<bool>>,
inverted: bool,
) -> Option<bool> {
KernelPredicateEvaluatorDefaults::finish_eval_pred_junction(op, preds, inverted)
}
}
pub trait DataSkippingPredicateEvaluator {
type Output;
type ColumnStat;
fn get_min_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Self::ColumnStat>;
fn get_max_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Self::ColumnStat>;
fn get_nullcount_stat(&self, col: &ColumnName) -> Option<Self::ColumnStat>;
fn get_rowcount_stat(&self) -> Option<Self::ColumnStat>;
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output>;
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn eval_pred_opaque(
&self,
op: &OpaquePredicateOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<Self::Output>;
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output>;
fn eval_partial_cmp(
&self,
ord: Ordering,
col: Self::ColumnStat,
val: &Scalar,
inverted: bool,
) -> Option<Self::Output>;
fn partial_cmp_min_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Self::Output> {
let min = self.get_min_stat(col, &val.data_type())?;
self.eval_partial_cmp(ord, min, val, inverted)
}
fn partial_cmp_max_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Self::Output> {
let max = self.get_max_stat(col, &val.data_type())?;
self.eval_partial_cmp(ord, max, val, inverted)
}
fn eval_pred_lt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
if inverted {
self.partial_cmp_max_stat(col, val, Ordering::Less, true)
} else {
self.partial_cmp_min_stat(col, val, Ordering::Less, false)
}
}
fn eval_pred_gt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
if inverted {
self.partial_cmp_min_stat(col, val, Ordering::Greater, true)
} else {
self.partial_cmp_max_stat(col, val, Ordering::Greater, false)
}
}
fn eval_pred_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
let (op, preds) = if inverted {
let preds = [
self.partial_cmp_min_stat(col, val, Ordering::Equal, true),
self.partial_cmp_max_stat(col, val, Ordering::Equal, true),
];
(JunctionPredicateOp::Or, preds)
} else {
let preds = [
self.partial_cmp_min_stat(col, val, Ordering::Greater, true),
self.partial_cmp_max_stat(col, val, Ordering::Less, true),
];
(JunctionPredicateOp::And, preds)
};
self.finish_eval_pred_junction(op, &mut preds.into_iter(), false)
}
}
impl<T: DataSkippingPredicateEvaluator + ?Sized> KernelPredicateEvaluator for T {
type Output = T::Output;
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_pred_scalar(val, inverted)
}
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_pred_scalar_is_null(val, inverted)
}
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Self::Output> {
self.eval_pred_is_null(col, inverted)
}
fn eval_pred_lt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_pred_lt(col, val, inverted)
}
fn eval_pred_gt(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_pred_gt(col, val, inverted)
}
fn eval_pred_eq(&self, col: &ColumnName, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_pred_eq(col, val, inverted)
}
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Self::Output> {
self.eval_pred_binary_scalars(op, left, right, inverted)
}
fn eval_pred_binary_columns(
&self,
_op: BinaryPredicateOp,
_a: &ColumnName,
_b: &ColumnName,
_inverted: bool,
) -> Option<Self::Output> {
None
}
fn eval_pred_opaque(
&self,
op: &OpaquePredicateOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<Self::Output> {
self.eval_pred_opaque(op, exprs, inverted)
}
fn eval_pred_expr_opaque(
&self,
_op: &OpaqueExpressionOpRef,
_exprs: &[Expr],
_inverted: bool,
) -> Option<Self::Output> {
None }
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Self::Output>>,
inverted: bool,
) -> Option<Self::Output> {
self.finish_eval_pred_junction(op, preds, inverted)
}
}