use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};
use std::time::Instant;
use tracing::{debug, error};
use crate::actions::visitors::SelectionVectorVisitor;
use crate::error::DeltaResult;
use crate::expressions::{
column_expr, column_name, joined_column_expr, BinaryPredicateOp, ColumnName,
Expression as Expr, ExpressionRef, JunctionPredicateOp, OpaquePredicateOpRef,
Predicate as Pred, PredicateRef, Scalar,
};
use crate::kernel_predicates::{
DataSkippingPredicateEvaluator, KernelPredicateEvaluator, KernelPredicateEvaluatorDefaults,
};
use crate::scan::metrics::ScanMetrics;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::utils::require;
use crate::{Engine, EngineData, Error, ExpressionEvaluator, PredicateEvaluator, RowVisitor as _};
pub(crate) mod stats_schema;
#[cfg(test)]
mod tests;
use delta_kernel_derive::internal_api;
#[cfg(test)]
pub(crate) fn as_data_skipping_predicate(pred: &Pred) -> Option<Pred> {
DataSkippingPredicateCreator::new(&Default::default()).eval(pred)
}
#[cfg(test)]
pub(crate) fn as_data_skipping_predicate_with_partitions(
pred: &Pred,
partition_columns: &HashSet<String>,
) -> Option<Pred> {
DataSkippingPredicateCreator::new(partition_columns).eval(pred)
}
fn as_sql_data_skipping_predicate(
pred: &Pred,
partition_columns: &HashSet<String>,
) -> Option<Pred> {
DataSkippingPredicateCreator::new(partition_columns).eval_sql_where(pred)
}
#[internal_api]
pub(crate) struct DataSkippingFilter {
stats_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn PredicateEvaluator>,
filter_evaluator: Arc<dyn PredicateEvaluator>,
metrics: Option<Arc<ScanMetrics>>,
}
impl DataSkippingFilter {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
engine: &dyn Engine,
predicate: Option<PredicateRef>,
stats_schema: Option<&SchemaRef>,
stats_expr: ExpressionRef,
partition_schema: Option<&SchemaRef>,
partition_expr: ExpressionRef,
input_schema: SchemaRef,
metrics: Option<Arc<ScanMetrics>>,
) -> Option<Self> {
static FILTER_PRED: LazyLock<PredicateRef> =
LazyLock::new(|| Arc::new(column_expr!("output").distinct(Expr::literal(false))));
static FILTER_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
"output",
DataType::BOOLEAN,
)]))
});
let predicate = predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);
let (unified_schema, unified_expr, partition_columns) =
Self::build_unified_schema_and_expr(
stats_schema,
stats_expr,
partition_schema,
partition_expr,
)?;
let stats_evaluator = engine
.evaluation_handler()
.new_expression_evaluator(
input_schema,
unified_expr,
unified_schema.as_ref().clone().into(),
)
.inspect_err(|e| error!("Failed to create stats evaluator: {e}"))
.ok()?;
let skipping_evaluator = engine
.evaluation_handler()
.new_predicate_evaluator(
unified_schema.clone(),
Arc::new(as_sql_data_skipping_predicate(
&predicate,
&partition_columns,
)?),
)
.inspect_err(|e| error!("Failed to create skipping evaluator: {e}"))
.ok()?;
let filter_evaluator = engine
.evaluation_handler()
.new_predicate_evaluator(FILTER_SCHEMA.clone(), FILTER_PRED.clone())
.inspect_err(|e| error!("Failed to create filter evaluator: {e}"))
.ok()?;
Some(Self {
stats_evaluator,
skipping_evaluator,
filter_evaluator,
metrics,
})
}
fn build_unified_schema_and_expr(
physical_stats_schema: Option<&SchemaRef>,
stats_expr: ExpressionRef,
physical_partition_schema: Option<&SchemaRef>,
partition_expr: ExpressionRef,
) -> Option<(SchemaRef, ExpressionRef, HashSet<String>)> {
let partition_columns: HashSet<String> = physical_partition_schema
.map(|s| s.fields().map(|f| f.name().to_string()).collect())
.unwrap_or_default();
let stats_field = |stats: &SchemaRef| {
StructField::nullable(
"stats_parsed",
DataType::Struct(Box::new(stats.as_ref().clone())),
)
};
let partition_field = |ps: &SchemaRef| {
StructField::nullable(
"partitionValues_parsed",
DataType::Struct(Box::new(ps.as_ref().clone())),
)
};
let is_add_field = StructField::not_null("is_add", DataType::BOOLEAN);
let unified_schema = match (physical_stats_schema, physical_partition_schema) {
(Some(stats), Some(ps)) => Arc::new(StructType::new_unchecked([
stats_field(stats),
partition_field(ps),
is_add_field,
])),
(Some(stats), None) => Arc::new(StructType::new_unchecked([stats_field(stats)])),
(None, Some(ps)) => Arc::new(StructType::new_unchecked([
partition_field(ps),
is_add_field,
])),
(None, None) => return None,
};
let is_add_expr: ExpressionRef = Arc::new(Pred::is_not_null(column_expr!("path")).into());
let unified_expr = match (
physical_stats_schema.is_some(),
physical_partition_schema.is_some(),
) {
(true, true) => Arc::new(Expr::struct_from([stats_expr, partition_expr, is_add_expr])),
(true, false) => Arc::new(Expr::struct_from([stats_expr])),
(false, true) => Arc::new(Expr::struct_from([partition_expr, is_add_expr])),
(false, false) => return None,
};
Some((unified_schema, unified_expr, partition_columns))
}
pub(crate) fn apply(&self, batch: &dyn EngineData) -> DeltaResult<Vec<bool>> {
let start_time = Instant::now();
let batch_len = batch.len();
let file_stats = self.stats_evaluator.evaluate(batch)?;
require!(
file_stats.len() == batch_len,
Error::internal_error(format!(
"stats evaluator output length {} != batch length {}",
file_stats.len(),
batch_len
))
);
let skipping_predicate = self.skipping_evaluator.evaluate(&*file_stats)?;
require!(
skipping_predicate.len() == batch_len,
Error::internal_error(format!(
"skipping evaluator output length {} != batch length {}",
skipping_predicate.len(),
batch_len
))
);
let selection_vector = self
.filter_evaluator
.evaluate(skipping_predicate.as_ref())?;
debug_assert_eq!(selection_vector.len(), batch_len);
require!(
selection_vector.len() == batch_len,
Error::internal_error(format!(
"filter evaluator output length {} != batch length {}",
selection_vector.len(),
batch_len
))
);
let mut visitor = SelectionVectorVisitor::default();
visitor.visit_rows_of(selection_vector.as_ref())?;
if visitor.num_filtered > 0 {
debug!(
"data skipping filtered {}/{batch_len} rows from batch",
visitor.num_filtered
);
}
if let Some(metrics) = self.metrics.as_ref() {
metrics.add_predicate_filtered(visitor.num_filtered);
metrics.add_predicate_eval_time_ns(start_time.elapsed().as_nanos() as u64)
}
Ok(visitor.selection_vector)
}
}
pub(crate) fn as_checkpoint_skipping_predicate(
pred: &Pred,
partition_columns: &[String],
) -> Option<Pred> {
let partition_columns: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
NullGuardedDataSkippingPredicateCreator { partition_columns }.eval(pred)
}
fn comparison_predicate(ord: Ordering, col: Expr, val: &Scalar, inverted: bool) -> Pred {
let pred_fn = match (ord, inverted) {
(Ordering::Less, false) => Pred::lt,
(Ordering::Less, true) => Pred::ge,
(Ordering::Equal, false) => Pred::eq,
(Ordering::Equal, true) => Pred::ne,
(Ordering::Greater, false) => Pred::gt,
(Ordering::Greater, true) => Pred::le,
};
pred_fn(col, val.clone())
}
fn collect_junction_preds(
mut op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Pred>>,
inverted: bool,
) -> Pred {
if inverted {
op = op.invert();
}
let mut keep_null = true;
let preds: Vec<_> = preds
.flat_map(|p| match p {
Some(pred) => Some(pred),
None => keep_null.then(|| {
keep_null = false;
Pred::null_literal()
}),
})
.collect();
Pred::junction(op, preds)
}
fn adjust_scalar_for_max_stat_truncation(val: &Scalar) -> Scalar {
match val {
Scalar::Timestamp(micros) => Scalar::Timestamp(micros.saturating_sub(999)),
Scalar::TimestampNtz(micros) => Scalar::TimestampNtz(micros.saturating_sub(999)),
other => other.clone(),
}
}
struct DataSkippingPredicateCreator<'a> {
partition_columns: &'a HashSet<String>,
}
impl<'a> DataSkippingPredicateCreator<'a> {
fn new(partition_columns: &'a HashSet<String>) -> Self {
Self { partition_columns }
}
fn is_partition_column(&self, col: &ColumnName) -> bool {
let path = col.path();
path.len() == 1 && self.partition_columns.contains(path[0].as_str())
}
fn guard_for_removes(&self, pred: Pred) -> Pred {
Pred::or(Pred::not(Pred::from(column_name!("is_add"))), pred)
}
}
impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator<'_> {
type Output = Pred;
type ColumnStat = Expr;
fn get_min_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
if self.is_partition_column(col) {
Some(joined_column_expr!("partitionValues_parsed", col))
} else {
Some(Expr::from(column_name!("stats_parsed.minValues").join(col)))
}
}
fn get_max_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
if self.is_partition_column(col) {
Some(joined_column_expr!("partitionValues_parsed", col))
} else {
Some(Expr::from(column_name!("stats_parsed.maxValues").join(col)))
}
}
fn partial_cmp_max_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Pred> {
let max = self.get_max_stat(col, &val.data_type())?;
if self.is_partition_column(col) {
return self.eval_partial_cmp(ord, max, val, inverted);
}
let adjusted = adjust_scalar_for_max_stat_truncation(val);
self.eval_partial_cmp(ord, max, &adjusted, inverted)
}
fn get_nullcount_stat(&self, col: &ColumnName) -> Option<Expr> {
if self.is_partition_column(col) {
None
} else {
Some(Expr::from(column_name!("stats_parsed.nullCount").join(col)))
}
}
fn get_rowcount_stat(&self) -> Option<Expr> {
Some(column_expr!("stats_parsed.numRecords"))
}
fn eval_partial_cmp(
&self,
ord: Ordering,
col: Expr,
val: &Scalar,
inverted: bool,
) -> Option<Pred> {
let is_partition = matches!(&col, Expr::Column(name)
if name.path().first().is_some_and(|f| f == "partitionValues_parsed"));
let cmp = comparison_predicate(ord, col, val, inverted);
Some(if is_partition {
self.guard_for_removes(cmp)
} else {
cmp
})
}
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar(val, inverted).map(Pred::literal)
}
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar_is_null(val, inverted).map(Pred::literal)
}
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Pred> {
if self.is_partition_column(col) {
let pv_expr = joined_column_expr!("partitionValues_parsed", col);
let pred = if inverted {
Pred::is_not_null(pv_expr)
} else {
Pred::is_null(pv_expr)
};
Some(self.guard_for_removes(pred))
} else {
let safe_to_skip = match inverted {
true => self.get_rowcount_stat()?, false => Expr::literal(0i64), };
Some(Pred::ne(self.get_nullcount_stat(col)?, safe_to_skip))
}
}
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_binary_scalars(op, left, right, inverted)
.map(Pred::literal)
}
fn eval_pred_opaque(
&self,
op: &OpaquePredicateOpRef,
exprs: &[Expr],
inverted: bool,
) -> Option<Pred> {
op.as_data_skipping_predicate(self, exprs, inverted)
}
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Pred>>,
inverted: bool,
) -> Option<Pred> {
Some(collect_junction_preds(op, preds, inverted))
}
}
struct NullGuardedDataSkippingPredicateCreator<'a> {
partition_columns: HashSet<&'a str>,
}
impl NullGuardedDataSkippingPredicateCreator<'_> {
fn is_partition_column(&self, col: &ColumnName) -> bool {
let path = col.path();
path.len() == 1 && self.partition_columns.contains(path[0].as_str())
}
}
impl DataSkippingPredicateEvaluator for NullGuardedDataSkippingPredicateCreator<'_> {
type Output = Pred;
type ColumnStat = Expr;
fn get_min_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
if self.is_partition_column(col) {
return None;
}
Some(joined_column_expr!("minValues", col))
}
fn get_max_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option<Expr> {
if self.is_partition_column(col) {
return None;
}
Some(joined_column_expr!("maxValues", col))
}
fn get_nullcount_stat(&self, col: &ColumnName) -> Option<Expr> {
if self.is_partition_column(col) {
return None;
}
Some(joined_column_expr!("nullCount", col))
}
fn get_rowcount_stat(&self) -> Option<Expr> {
Some(column_expr!("numRecords"))
}
fn partial_cmp_max_stat(
&self,
col: &ColumnName,
val: &Scalar,
ord: Ordering,
inverted: bool,
) -> Option<Pred> {
let max = self.get_max_stat(col, &val.data_type())?;
let adjusted = adjust_scalar_for_max_stat_truncation(val);
self.eval_partial_cmp(ord, max, &adjusted, inverted)
}
fn eval_partial_cmp(
&self,
ord: Ordering,
col: Expr,
val: &Scalar,
inverted: bool,
) -> Option<Pred> {
let comparison = comparison_predicate(ord, col.clone(), val, inverted);
Some(Pred::or(Pred::is_null(col), comparison))
}
fn eval_pred_scalar(&self, val: &Scalar, inverted: bool) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar(val, inverted).map(Pred::literal)
}
fn eval_pred_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_scalar_is_null(val, inverted).map(Pred::literal)
}
fn eval_pred_is_null(&self, col: &ColumnName, inverted: bool) -> Option<Pred> {
if inverted {
return None; }
let nullcount = self.get_nullcount_stat(col)?;
let comparison = Pred::ne(nullcount.clone(), Expr::literal(0i64));
Some(Pred::or(Pred::is_null(nullcount), comparison))
}
fn eval_pred_binary_scalars(
&self,
op: BinaryPredicateOp,
left: &Scalar,
right: &Scalar,
inverted: bool,
) -> Option<Pred> {
KernelPredicateEvaluatorDefaults::eval_pred_binary_scalars(op, left, right, inverted)
.map(Pred::literal)
}
fn eval_pred_opaque(
&self,
_op: &OpaquePredicateOpRef,
_exprs: &[Expr],
_inverted: bool,
) -> Option<Pred> {
None
}
fn finish_eval_pred_junction(
&self,
op: JunctionPredicateOp,
preds: &mut dyn Iterator<Item = Option<Pred>>,
inverted: bool,
) -> Option<Pred> {
Some(collect_junction_preds(op, preds, inverted))
}
}