use std::collections::HashSet;
use std::ops::Not;
use std::sync::Arc;
use tracing::debug;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::actions::{get_log_schema, ADD_NAME};
use crate::error::DeltaResult;
use crate::expressions::{BinaryOperator, Expression as Expr, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef, StructField, StructType};
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler};
fn get_tight_null_expr(null_col: String) -> Expr {
Expr::and(
Expr::distinct(Expr::column("tightBounds"), Expr::literal(false)),
Expr::gt(Expr::column(null_col), Expr::literal(0i64)),
)
}
fn get_wide_null_expr(null_col: String) -> Expr {
Expr::and(
Expr::eq(Expr::column("tightBounds"), Expr::literal(false)),
Expr::eq(Expr::column("numRecords"), Expr::column(null_col)),
)
}
fn get_tight_not_null_expr(null_col: String) -> Expr {
Expr::and(
Expr::distinct(Expr::column("tightBounds"), Expr::literal(false)),
Expr::lt(Expr::column(null_col), Expr::column("numRecords")),
)
}
fn get_wide_not_null_expr(null_col: String) -> Expr {
Expr::and(
Expr::eq(Expr::column("tightBounds"), Expr::literal(false)),
Expr::ne(Expr::column("numRecords"), Expr::column(null_col)),
)
}
fn as_inverted_data_skipping_predicate(expr: &Expr) -> Option<Expr> {
use Expr::*;
match expr {
UnaryOperation { op, expr } => match op {
UnaryOperator::Not => as_data_skipping_predicate(expr),
UnaryOperator::IsNull => {
if let Column(col) = expr.as_ref() {
let null_col = format!("nullCount.{col}");
Some(Expr::or(
get_tight_not_null_expr(null_col.clone()),
get_wide_not_null_expr(null_col),
))
} else {
None
}
}
},
BinaryOperation { op, left, right } => {
let expr = Expr::binary(op.invert()?, left.as_ref().clone(), right.as_ref().clone());
as_data_skipping_predicate(&expr)
}
VariadicOperation { op, exprs } => {
let expr = Expr::variadic(op.invert(), exprs.iter().cloned().map(Expr::not));
as_data_skipping_predicate(&expr)
}
_ => None,
}
}
fn as_data_skipping_predicate(expr: &Expr) -> Option<Expr> {
use BinaryOperator::*;
use Expr::*;
use UnaryOperator::*;
match expr {
BinaryOperation { op, left, right } => {
let (op, col, val) = match (left.as_ref(), right.as_ref()) {
(Column(col), Literal(val)) => (op.clone(), col, val),
(Literal(val), Column(col)) => (op.commute()?, col, val),
_ => return None, };
let stats_col = match op {
LessThan | LessThanOrEqual => "minValues",
GreaterThan | GreaterThanOrEqual => "maxValues",
Equal => {
return as_data_skipping_predicate(&Expr::and(
Expr::le(Column(col.clone()), Literal(val.clone())),
Expr::le(Literal(val.clone()), Column(col.clone())),
));
}
NotEqual => {
return Some(Expr::or(
Expr::gt(Column(format!("minValues.{}", col)), Literal(val.clone())),
Expr::lt(Column(format!("maxValues.{}", col)), Literal(val.clone())),
));
}
_ => return None, };
let col = format!("{}.{}", stats_col, col);
Some(Expr::binary(op, Column(col), Literal(val.clone())))
}
UnaryOperation { op: Not, expr } => as_inverted_data_skipping_predicate(expr),
UnaryOperation { op: IsNull, expr } => {
if let Column(col) = expr.as_ref() {
let null_col = format!("nullCount.{col}");
Some(Expr::or(
get_tight_null_expr(null_col.clone()),
get_wide_null_expr(null_col),
))
} else {
None
}
}
VariadicOperation { op, exprs } => {
let exprs = exprs.iter().map(as_data_skipping_predicate);
match op {
VariadicOperator::And => Some(Expr::and_from(exprs.flatten())),
VariadicOperator::Or => Some(Expr::or_from(exprs.collect::<Option<Vec<_>>>()?)),
}
}
_ => None,
}
}
pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn ExpressionEvaluator>,
filter_evaluator: Arc<dyn ExpressionEvaluator>,
json_handler: Arc<dyn JsonHandler>,
}
impl DataSkippingFilter {
pub(crate) fn new(
engine: &dyn Engine,
table_schema: &SchemaRef,
predicate: &Option<Expr>,
) -> Option<Self> {
lazy_static::lazy_static!(
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![
StructField::new("predicate", DataType::BOOLEAN, true),
]).into();
static ref STATS_EXPR: Expr = Expr::column("add.stats");
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false));
);
let predicate = match predicate {
Some(predicate) => predicate,
None => return None,
};
debug!("Creating a data skipping filter for {}", &predicate);
let field_names: HashSet<_> = predicate.references();
let data_fields: Vec<_> = table_schema
.fields()
.filter(|field| field_names.contains(&field.name.as_str()))
.cloned()
.collect();
if data_fields.is_empty() {
return None;
}
let stats_schema = Arc::new(StructType::new(vec![
StructField::new("numRecords", DataType::LONG, true),
StructField::new("tightBounds", DataType::BOOLEAN, true),
StructField::new(
"nullCount",
StructType::new(
data_fields
.iter()
.map(|data_field| {
StructField::new(
&data_field.name,
DataType::Primitive(PrimitiveType::Long),
true,
)
})
.collect(),
),
true,
),
StructField::new("minValues", StructType::new(data_fields.clone()), true),
StructField::new("maxValues", StructType::new(data_fields), true),
]));
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
get_log_schema().project(&[ADD_NAME]).unwrap(),
STATS_EXPR.clone(),
DataType::STRING,
);
let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_expr([as_data_skipping_predicate(predicate)?]),
PREDICATE_SCHEMA.clone(),
);
let filter_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
FILTER_EXPR.clone(),
DataType::BOOLEAN,
);
Some(Self {
stats_schema,
select_stats_evaluator,
skipping_evaluator,
filter_evaluator,
json_handler: engine.get_json_handler(),
})
}
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> {
let stats = self.select_stats_evaluator.evaluate(actions)?;
let parsed_stats = self
.json_handler
.parse_json(stats, self.stats_schema.clone())?;
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;
let selection_vector = self
.filter_evaluator
.evaluate(skipping_predicate.as_ref())?;
let mut visitor = SelectionVectorVisitor::default();
let schema = StructType::new(vec![StructField::new("output", DataType::BOOLEAN, false)]);
selection_vector
.as_ref()
.extract(Arc::new(schema), &mut visitor)?;
Ok(visitor.selection_vector)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rewrite_basic_comparison() {
let column = Expr::column("a");
let lit_int = Expr::literal(1_i32);
let min_col = Expr::column("minValues.a");
let max_col = Expr::column("maxValues.a");
let cases = [
(
column.clone().lt(lit_int.clone()),
Expr::lt(min_col.clone(), lit_int.clone()),
),
(
lit_int.clone().lt(column.clone()),
Expr::gt(max_col.clone(), lit_int.clone()),
),
(
column.clone().gt(lit_int.clone()),
Expr::gt(max_col.clone(), lit_int.clone()),
),
(
lit_int.clone().gt(column.clone()),
Expr::lt(min_col.clone(), lit_int.clone()),
),
(
column.clone().lt_eq(lit_int.clone()),
Expr::le(min_col.clone(), lit_int.clone()),
),
(
lit_int.clone().lt_eq(column.clone()),
Expr::ge(max_col.clone(), lit_int.clone()),
),
(
column.clone().gt_eq(lit_int.clone()),
Expr::ge(max_col.clone(), lit_int.clone()),
),
(
lit_int.clone().gt_eq(column.clone()),
Expr::le(min_col.clone(), lit_int.clone()),
),
(
column.clone().eq(lit_int.clone()),
Expr::and_from([
Expr::le(min_col.clone(), lit_int.clone()),
Expr::ge(max_col.clone(), lit_int.clone()),
]),
),
(
lit_int.clone().eq(column.clone()),
Expr::and_from([
Expr::le(min_col.clone(), lit_int.clone()),
Expr::ge(max_col.clone(), lit_int.clone()),
]),
),
(
column.clone().ne(lit_int.clone()),
Expr::or_from([
Expr::gt(min_col.clone(), lit_int.clone()),
Expr::lt(max_col.clone(), lit_int.clone()),
]),
),
(
lit_int.clone().ne(column.clone()),
Expr::or_from([
Expr::gt(min_col.clone(), lit_int.clone()),
Expr::lt(max_col.clone(), lit_int.clone()),
]),
),
];
for (input, expected) in cases {
let rewritten = as_data_skipping_predicate(&input).unwrap();
assert_eq!(rewritten, expected)
}
}
}