use std::borrow::Cow;
use std::cmp::Ordering;
use std::sync::{Arc, LazyLock};
use tracing::debug;
use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::error::DeltaResult;
use crate::expressions::{
column_expr, joined_column_expr, BinaryOperator, ColumnName, Expression as Expr, ExpressionRef,
Scalar, VariadicOperator,
};
use crate::predicates::{
DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults,
};
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType};
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler, RowVisitor as _};
#[cfg(test)]
mod tests;
fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option<Expr> {
DataSkippingPredicateCreator.eval_expr(expr, inverted)
}
pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn ExpressionEvaluator>,
filter_evaluator: Arc<dyn ExpressionEvaluator>,
json_handler: Arc<dyn JsonHandler>,
}
impl DataSkippingFilter {
pub(crate) fn new(
engine: &dyn Engine,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> Option<Self> {
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)])
});
static STATS_EXPR: LazyLock<Expr> = LazyLock::new(|| column_expr!("add.stats"));
static FILTER_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("predicate").distinct(false));
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);
struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
fn transform_primitive(
&mut self,
_ptype: &'a PrimitiveType,
) -> Option<Cow<'a, PrimitiveType>> {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform
.transform_struct(&referenced_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::new("numRecords", DataType::LONG, true),
StructField::new("nullCount", nullcount_schema, true),
StructField::new("minValues", referenced_schema.clone(), true),
StructField::new("maxValues", referenced_schema, true),
]));
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
);
let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]),
PREDICATE_SCHEMA.clone(),
);
let filter_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
FILTER_EXPR.clone(),
DataType::BOOLEAN,
);
Some(Self {
stats_schema,
select_stats_evaluator,
skipping_evaluator,
filter_evaluator,
json_handler: engine.get_json_handler(),
})
}
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> {
let stats = self.select_stats_evaluator.evaluate(actions)?;
assert_eq!(stats.len(), actions.len());
let parsed_stats = self
.json_handler
.parse_json(stats, self.stats_schema.clone())?;
assert_eq!(parsed_stats.len(), actions.len());
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;
assert_eq!(skipping_predicate.len(), actions.len());
let selection_vector = self
.filter_evaluator
.evaluate(skipping_predicate.as_ref())?;
assert_eq!(selection_vector.len(), actions.len());
let mut visitor = SelectionVectorVisitor::default();
visitor.visit_rows_of(selection_vector.as_ref())?;
Ok(visitor.selection_vector)
}
}
struct DataSkippingPredicateCreator;
impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator {
type Output = Expr;
type TypedStat = Expr;
type IntStat = Expr;
fn get_min_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
Some(joined_column_expr!("minValues", col))
}
fn get_max_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
Some(joined_column_expr!("maxValues", col))
}
fn get_nullcount_stat(&self, col: &ColumnName) -> Option<Expr> {
Some(joined_column_expr!("nullCount", col))
}
fn get_rowcount_stat(&self) -> Option<Expr> {
Some(column_expr!("numRecords"))
}
fn eval_partial_cmp(
&self,
ord: Ordering,
col: Expr,
val: &Scalar,
inverted: bool,
) -> Option<Expr> {
let op = match (ord, inverted) {
(Ordering::Less, false) => BinaryOperator::LessThan,
(Ordering::Less, true) => BinaryOperator::GreaterThanOrEqual,
(Ordering::Equal, false) => BinaryOperator::Equal,
(Ordering::Equal, true) => BinaryOperator::NotEqual,
(Ordering::Greater, false) => BinaryOperator::GreaterThan,
(Ordering::Greater, true) => BinaryOperator::LessThanOrEqual,
};
Some(Expr::binary(op, col, val.clone()))
}
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Expr> {
PredicateEvaluatorDefaults::eval_scalar(val, inverted).map(Expr::literal)
}
fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Expr> {
let safe_to_skip = match inverted {
true => self.get_rowcount_stat()?, false => Expr::literal(0i64), };
Some(Expr::ne(self.get_nullcount_stat(col)?, safe_to_skip))
}
fn eval_binary_scalars(
&self,
op: BinaryOperator,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Expr> {
PredicateEvaluatorDefaults::eval_binary_scalars(op, left, right, inverted)
.map(Expr::literal)
}
fn finish_eval_variadic(
&self,
mut op: VariadicOperator,
exprs: impl IntoIterator<Item = Option<Expr>>,
inverted: bool,
) -> Option<Expr> {
if inverted {
op = op.invert();
}
let mut keep_null = true;
let exprs: Vec<_> = exprs
.into_iter()
.flat_map(|e| match e {
Some(expr) => Some(expr),
None => keep_null.then(|| {
keep_null = false;
Expr::null_literal(DataType::BOOLEAN)
}),
})
.collect();
Some(Expr::variadic(op, exprs))
}
}