use std::collections::HashSet;
use std::sync::Arc;
use arrow::array::AsArray;
use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use log::trace;
use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::tree_node::TransformedResult;
use datafusion_common::{
internal_err, plan_datafusion_err, plan_err,
tree_node::{Transformed, TreeNode},
ScalarValue,
};
use datafusion_common::{Column, DFSchema};
use datafusion_expr_common::operator::Operator;
use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
pub trait PruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef>;
fn max_values(&self, column: &Column) -> Option<ArrayRef>;
fn num_containers(&self) -> usize;
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray>;
}
#[derive(Debug, Clone)]
pub struct PruningPredicate {
schema: SchemaRef,
predicate_expr: Arc<dyn PhysicalExpr>,
required_columns: RequiredColumns,
orig_expr: Arc<dyn PhysicalExpr>,
literal_guarantees: Vec<LiteralGuarantee>,
}
pub trait UnhandledPredicateHook {
fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
}
#[derive(Debug, Clone)]
struct ConstantUnhandledPredicateHook {
default: Arc<dyn PhysicalExpr>,
}
impl Default for ConstantUnhandledPredicateHook {
fn default() -> Self {
Self {
default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
}
}
}
impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
Arc::clone(&self.default)
}
}
impl PruningPredicate {
pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
let mut required_columns = RequiredColumns::new();
let predicate_expr = build_predicate_expression(
&expr,
schema.as_ref(),
&mut required_columns,
&unhandled_hook,
);
let literal_guarantees = LiteralGuarantee::analyze(&expr);
Ok(Self {
schema,
predicate_expr,
required_columns,
orig_expr: expr,
literal_guarantees,
})
}
pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
let mut builder = BoolVecBuilder::new(statistics.num_containers());
for literal_guarantee in &self.literal_guarantees {
let LiteralGuarantee {
column,
guarantee,
literals,
} = literal_guarantee;
if let Some(results) = statistics.contained(column, literals) {
match guarantee {
Guarantee::In => builder.combine_array(&results),
Guarantee::NotIn => {
builder.combine_array(&arrow::compute::not(&results)?)
}
}
if builder.check_all_pruned() {
return Ok(builder.build());
}
}
}
let statistics_batch =
build_statistics_record_batch(statistics, &self.required_columns)?;
builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
Ok(builder.build())
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.orig_expr
}
pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate_expr
}
pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
&self.literal_guarantees
}
pub fn always_true(&self) -> bool {
is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
}
#[allow(dead_code)]
pub fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}
pub fn literal_columns(&self) -> Vec<String> {
let mut seen = HashSet::new();
self.literal_guarantees
.iter()
.map(|e| &e.column.name)
.filter(|name| seen.insert(*name))
.map(|s| s.to_string())
.collect()
}
}
#[derive(Debug)]
struct BoolVecBuilder {
inner: Vec<bool>,
}
impl BoolVecBuilder {
fn new(num_containers: usize) -> Self {
Self {
inner: vec![true; num_containers],
}
}
fn combine_array(&mut self, array: &BooleanArray) {
assert_eq!(array.len(), self.inner.len());
for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
if let Some(false) = new {
*cur = false;
}
}
}
fn combine_value(&mut self, value: ColumnarValue) {
match value {
ColumnarValue::Array(array) => {
self.combine_array(array.as_boolean());
}
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
self.inner = vec![false; self.inner.len()];
}
_ => {
}
}
}
fn build(self) -> Vec<bool> {
self.inner
}
fn check_all_pruned(&self) -> bool {
self.inner.iter().all(|&x| !x)
}
}
fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
expr.as_any()
.downcast_ref::<phys_expr::Literal>()
.map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
.unwrap_or_default()
}
#[derive(Debug, Default, Clone)]
pub struct RequiredColumns {
columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
}
impl RequiredColumns {
fn new() -> Self {
Self::default()
}
#[allow(dead_code)]
pub fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
let c1 = &w[0].0;
let c2 = &w[1].0;
c1 == c2
}) {
self.columns.first().map(|r| &r.0)
} else {
None
}
}
pub(crate) fn iter(
&self,
) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
self.columns.iter()
}
fn find_stat_column(
&self,
column: &phys_expr::Column,
statistics_type: StatisticsType,
) -> Option<usize> {
self.columns
.iter()
.enumerate()
.find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
.map(|(i, (_c, _t, _f))| i)
}
fn stat_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
stat_type: StatisticsType,
) -> Result<Arc<dyn PhysicalExpr>> {
let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
Some(idx) => (idx, false),
None => (self.columns.len(), true),
};
let suffix = match stat_type {
StatisticsType::Min => "min",
StatisticsType::Max => "max",
StatisticsType::NullCount => "null_count",
StatisticsType::RowCount => "row_count",
};
let stat_column =
phys_expr::Column::new(&format!("{}_{}", column.name(), suffix), idx);
if need_to_insert {
let nullable = true;
let stat_field =
Field::new(stat_column.name(), field.data_type().clone(), nullable);
self.columns.push((column.clone(), stat_type, stat_field));
}
rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
}
fn min_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
}
fn max_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
}
fn null_count_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
}
fn row_count_column_expr(
&mut self,
column: &phys_expr::Column,
column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
}
}
impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
Self { columns }
}
}
fn build_statistics_record_batch<S: PruningStatistics>(
statistics: &S,
required_columns: &RequiredColumns,
) -> Result<RecordBatch> {
let mut fields = Vec::<Field>::new();
let mut arrays = Vec::<ArrayRef>::new();
for (column, statistics_type, stat_field) in required_columns.iter() {
let column = Column::from_name(column.name());
let data_type = stat_field.data_type();
let num_containers = statistics.num_containers();
let array = match statistics_type {
StatisticsType::Min => statistics.min_values(&column),
StatisticsType::Max => statistics.max_values(&column),
StatisticsType::NullCount => statistics.null_counts(&column),
StatisticsType::RowCount => statistics.row_counts(&column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
if num_containers != array.len() {
return internal_err!(
"mismatched statistics length. Expected {}, got {}",
num_containers,
array.len()
);
}
let array = arrow::compute::cast(&array, data_type)?;
fields.push(stat_field.clone());
arrays.push(array);
}
let schema = Arc::new(Schema::new(fields));
let mut options = RecordBatchOptions::default();
options.row_count = Some(statistics.num_containers());
trace!(
"Creating statistics batch for {:#?} with {:#?}",
required_columns,
arrays
);
RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
plan_datafusion_err!("Can not create statistics record batch: {err}")
})
}
struct PruningExpressionBuilder<'a> {
column: phys_expr::Column,
column_expr: Arc<dyn PhysicalExpr>,
op: Operator,
scalar_expr: Arc<dyn PhysicalExpr>,
field: &'a Field,
required_columns: &'a mut RequiredColumns,
}
impl<'a> PruningExpressionBuilder<'a> {
fn try_new(
left: &'a Arc<dyn PhysicalExpr>,
right: &'a Arc<dyn PhysicalExpr>,
op: Operator,
schema: &'a Schema,
required_columns: &'a mut RequiredColumns,
) -> Result<Self> {
let left_columns = collect_columns(left);
let right_columns = collect_columns(right);
let (column_expr, scalar_expr, columns, correct_operator) =
match (left_columns.len(), right_columns.len()) {
(1, 0) => (left, right, left_columns, op),
(0, 1) => (right, left, right_columns, reverse_operator(op)?),
_ => {
return plan_err!(
"Multi-column expressions are not currently supported"
);
}
};
let df_schema = DFSchema::try_from(schema.clone())?;
let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
column_expr,
correct_operator,
scalar_expr,
df_schema,
)?;
let column = columns.iter().next().unwrap().clone();
let field = match schema.column_with_name(column.name()) {
Some((_, f)) => f,
_ => {
return plan_err!("Field not found in schema");
}
};
Ok(Self {
column,
column_expr,
op: correct_operator,
scalar_expr,
field,
required_columns,
})
}
fn op(&self) -> Operator {
self.op
}
fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.scalar_expr
}
fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns
.min_column_expr(&self.column, &self.column_expr, self.field)
}
fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns
.max_column_expr(&self.column, &self.column_expr, self.field)
}
fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
let column_expr = Arc::new(self.column.clone()) as _;
let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
self.required_columns.null_count_column_expr(
&self.column,
&column_expr,
null_count_field,
)
}
fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
let column_expr = Arc::new(self.column.clone()) as _;
let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
self.required_columns.row_count_column_expr(
&self.column,
&column_expr,
row_count_field,
)
}
}
fn rewrite_expr_to_prunable(
column_expr: &PhysicalExprRef,
op: Operator,
scalar_expr: &PhysicalExprRef,
schema: DFSchema,
) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
if !is_compare_op(op) {
return plan_err!("rewrite_expr_to_prunable only support compare expression");
}
let column_expr_any = column_expr.as_any();
if column_expr_any
.downcast_ref::<phys_expr::Column>()
.is_some()
{
Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
} else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
let arrow_schema: SchemaRef = schema.clone().into();
let from_type = cast.expr().data_type(&arrow_schema)?;
verify_support_type_for_prune(&from_type, cast.cast_type())?;
let (left, op, right) =
rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
let left = Arc::new(phys_expr::CastExpr::new(
left,
cast.cast_type().clone(),
None,
));
Ok((left, op, right))
} else if let Some(try_cast) =
column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
{
let arrow_schema: SchemaRef = schema.clone().into();
let from_type = try_cast.expr().data_type(&arrow_schema)?;
verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
let (left, op, right) =
rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
let left = Arc::new(phys_expr::TryCastExpr::new(
left,
try_cast.cast_type().clone(),
));
Ok((left, op, right))
} else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
let (left, op, right) =
rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
let right = Arc::new(phys_expr::NegativeExpr::new(right));
Ok((left, reverse_operator(op)?, right))
} else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
if op != Operator::Eq && op != Operator::NotEq {
return plan_err!("Not with operator other than Eq / NotEq is not supported");
}
if not
.arg()
.as_any()
.downcast_ref::<phys_expr::Column>()
.is_some()
{
let left = Arc::clone(not.arg());
let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
Ok((left, reverse_operator(op)?, right))
} else {
plan_err!("Not with complex expression {column_expr:?} is not supported")
}
} else {
plan_err!("column expression {column_expr:?} is not supported")
}
}
fn is_compare_op(op: Operator) -> bool {
matches!(
op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
| Operator::LikeMatch
| Operator::NotLikeMatch
)
}
fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
if matches!(
from_type,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Decimal128(_, _)
) && matches!(
to_type,
DataType::Int8 | DataType::Int32 | DataType::Int64 | DataType::Decimal128(_, _)
) {
Ok(())
} else {
plan_err!(
"Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
)
}
}
fn rewrite_column_expr(
e: Arc<dyn PhysicalExpr>,
column_old: &phys_expr::Column,
column_new: &phys_expr::Column,
) -> Result<Arc<dyn PhysicalExpr>> {
e.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
if column == column_old {
return Ok(Transformed::yes(Arc::new(column_new.clone())));
}
}
Ok(Transformed::no(expr))
})
.data()
}
fn reverse_operator(op: Operator) -> Result<Operator> {
op.swap().ok_or_else(|| {
DataFusionError::Internal(format!(
"Could not reverse operator {op} while building pruning predicate"
))
})
}
fn build_single_column_expr(
column: &phys_expr::Column,
schema: &Schema,
required_columns: &mut RequiredColumns,
is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
let field = schema.field_with_name(column.name()).ok()?;
if matches!(field.data_type(), &DataType::Boolean) {
let col_ref = Arc::new(column.clone()) as _;
let min = required_columns
.min_column_expr(column, &col_ref, field)
.ok()?;
let max = required_columns
.max_column_expr(column, &col_ref, field)
.ok()?;
if is_not {
Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
phys_expr::BinaryExpr::new(min, Operator::And, max),
))))
} else {
Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
}
} else {
None
}
}
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
with_not: bool,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;
let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
if with_not {
if let Ok(row_count_expr) =
required_columns.row_count_column_expr(col, expr, null_count_field)
{
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::NotEq,
row_count_expr,
)) as _
})
.ok()
} else {
None
}
} else {
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
})
.ok()
}
} else {
None
}
}
const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
pub struct PredicateRewriter {
unhandled_hook: Arc<dyn UnhandledPredicateHook>,
}
impl Default for PredicateRewriter {
fn default() -> Self {
Self {
unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
}
}
}
impl PredicateRewriter {
pub fn new() -> Self {
Self::default()
}
pub fn with_unhandled_hook(
self,
unhandled_hook: Arc<dyn UnhandledPredicateHook>,
) -> Self {
Self { unhandled_hook }
}
pub fn rewrite_predicate_to_statistics_predicate(
&self,
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
) -> Arc<dyn PhysicalExpr> {
let mut required_columns = RequiredColumns::new();
build_predicate_expression(
expr,
schema,
&mut required_columns,
&self.unhandled_hook,
)
}
}
fn build_predicate_expression(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
) -> Arc<dyn PhysicalExpr> {
let expr_any = expr.as_any();
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
.unwrap_or_else(|| unhandled_hook.handle(expr));
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
true,
)
.unwrap_or_else(|| unhandled_hook.handle(expr));
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or_else(|| unhandled_hook.handle(expr));
}
if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, true)
.unwrap_or_else(|| unhandled_hook.handle(expr));
} else {
return unhandled_hook.handle(expr);
}
}
if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
if !in_list.list().is_empty()
&& in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
{
let eq_op = if in_list.negated() {
Operator::NotEq
} else {
Operator::Eq
};
let re_op = if in_list.negated() {
Operator::And
} else {
Operator::Or
};
let change_expr = in_list
.list()
.iter()
.map(|e| {
Arc::new(phys_expr::BinaryExpr::new(
Arc::clone(in_list.expr()),
eq_op,
Arc::clone(e),
)) as _
})
.reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
.unwrap();
return build_predicate_expression(
&change_expr,
schema,
required_columns,
unhandled_hook,
);
} else {
return unhandled_hook.handle(expr);
}
}
let (left, op, right) = {
if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
(
Arc::clone(bin_expr.left()),
*bin_expr.op(),
Arc::clone(bin_expr.right()),
)
} else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
if like_expr.case_insensitive() {
return unhandled_hook.handle(expr);
}
let op = match (like_expr.negated(), like_expr.case_insensitive()) {
(false, false) => Operator::LikeMatch,
(true, false) => Operator::NotLikeMatch,
(false, true) => Operator::ILikeMatch,
(true, true) => Operator::NotILikeMatch,
};
(
Arc::clone(like_expr.expr()),
op,
Arc::clone(like_expr.pattern()),
)
} else {
return unhandled_hook.handle(expr);
}
};
if op == Operator::And || op == Operator::Or {
let left_expr =
build_predicate_expression(&left, schema, required_columns, unhandled_hook);
let right_expr =
build_predicate_expression(&right, schema, required_columns, unhandled_hook);
let expr = match (&left_expr, op, &right_expr) {
(left, Operator::And, _) if is_always_true(left) => right_expr,
(_, Operator::And, right) if is_always_true(right) => left_expr,
(left, Operator::Or, right)
if is_always_true(left) || is_always_true(right) =>
{
Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
}
_ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
};
return expr;
}
let expr_builder =
PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
let mut expr_builder = match expr_builder {
Ok(builder) => builder,
Err(_) => return unhandled_hook.handle(expr),
};
build_statistics_expr(&mut expr_builder)
.unwrap_or_else(|_| unhandled_hook.handle(expr))
}
fn build_statistics_expr(
expr_builder: &mut PruningExpressionBuilder,
) -> Result<Arc<dyn PhysicalExpr>> {
let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
Operator::NotEq => {
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
Arc::new(phys_expr::BinaryExpr::new(
Arc::new(phys_expr::BinaryExpr::new(
min_column_expr,
Operator::NotEq,
Arc::clone(expr_builder.scalar_expr()),
)),
Operator::Or,
Arc::new(phys_expr::BinaryExpr::new(
Arc::clone(expr_builder.scalar_expr()),
Operator::NotEq,
max_column_expr,
)),
))
}
Operator::Eq => {
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
Arc::new(phys_expr::BinaryExpr::new(
Arc::new(phys_expr::BinaryExpr::new(
min_column_expr,
Operator::LtEq,
Arc::clone(expr_builder.scalar_expr()),
)),
Operator::And,
Arc::new(phys_expr::BinaryExpr::new(
Arc::clone(expr_builder.scalar_expr()),
Operator::LtEq,
max_column_expr,
)),
))
}
Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
plan_datafusion_err!(
"LIKE expression with wildcard at the beginning is not supported"
)
})?,
Operator::Gt => {
Arc::new(phys_expr::BinaryExpr::new(
expr_builder.max_column_expr()?,
Operator::Gt,
Arc::clone(expr_builder.scalar_expr()),
))
}
Operator::GtEq => {
Arc::new(phys_expr::BinaryExpr::new(
expr_builder.max_column_expr()?,
Operator::GtEq,
Arc::clone(expr_builder.scalar_expr()),
))
}
Operator::Lt => {
Arc::new(phys_expr::BinaryExpr::new(
expr_builder.min_column_expr()?,
Operator::Lt,
Arc::clone(expr_builder.scalar_expr()),
))
}
Operator::LtEq => {
Arc::new(phys_expr::BinaryExpr::new(
expr_builder.min_column_expr()?,
Operator::LtEq,
Arc::clone(expr_builder.scalar_expr()),
))
}
_ => {
return plan_err!(
"expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
);
}
};
let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
Ok(statistics_expr)
}
fn build_like_match(
expr_builder: &mut PruningExpressionBuilder,
) -> Option<Arc<dyn PhysicalExpr>> {
fn unpack_string(s: &ScalarValue) -> Option<&str> {
s.try_as_str().flatten()
}
fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
let s = unpack_string(lit.value())?;
return Some(s);
}
None
}
let min_column_expr = expr_builder.min_column_expr().ok()?;
let max_column_expr = expr_builder.max_column_expr().ok()?;
let scalar_expr = expr_builder.scalar_expr();
let s = extract_string_literal(scalar_expr)?;
let first_wildcard_index = s.find(['%', '_']);
if first_wildcard_index == Some(0) {
return None;
}
let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
let prefix = &s[..wildcard_index];
let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
prefix.to_string(),
))));
let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
increment_utf8(prefix)?,
))));
(lower_bound_lit, upper_bound_lit)
} else {
let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
s.to_string(),
))));
(Arc::clone(&bound), bound)
};
let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
lower_bound,
Operator::LtEq,
Arc::clone(&max_column_expr),
));
let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
Arc::clone(&min_column_expr),
Operator::LtEq,
upper_bound,
));
let combined = Arc::new(phys_expr::BinaryExpr::new(
upper_bound_expr,
Operator::And,
lower_bound_expr,
));
Some(combined)
}
fn increment_utf8(data: &str) -> Option<String> {
fn is_valid_unicode(c: char) -> bool {
let cp = c as u32;
if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
return false;
}
if cp >= 0x110000 {
return false;
}
true
}
let mut code_points: Vec<char> = data.chars().collect();
for idx in (0..code_points.len()).rev() {
let original = code_points[idx] as u32;
if let Some(next_char) = char::from_u32(original + 1) {
if is_valid_unicode(next_char) {
code_points[idx] = next_char;
code_points.truncate(idx + 1);
return Some(code_points.into_iter().collect());
}
}
}
None
}
fn wrap_null_count_check_expr(
statistics_expr: Arc<dyn PhysicalExpr>,
expr_builder: &mut PruningExpressionBuilder,
) -> Result<Arc<dyn PhysicalExpr>> {
let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
expr_builder.null_count_column_expr()?,
Operator::NotEq,
expr_builder.row_count_column_expr()?,
));
Ok(Arc::new(phys_expr::BinaryExpr::new(
not_when_null_count_eq_row_count,
Operator::And,
statistics_expr,
)))
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum StatisticsType {
Min,
Max,
NullCount,
RowCount,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::ops::{Not, Rem};
use super::*;
use datafusion_common::assert_batches_eq;
use datafusion_expr::{col, lit};
use arrow::array::Decimal128Array;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
datatypes::TimeUnit,
};
use datafusion_expr::expr::InList;
use datafusion_expr::{cast, is_null, try_cast, Expr};
use datafusion_functions_nested::expr_fn::{array_has, make_array};
use datafusion_physical_expr::expressions as phys_expr;
use datafusion_physical_expr::planner::logical2physical;
#[derive(Debug, Default)]
struct ContainerStats {
min: Option<ArrayRef>,
max: Option<ArrayRef>,
null_counts: Option<ArrayRef>,
row_counts: Option<ArrayRef>,
contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
}
impl ContainerStats {
fn new() -> Self {
Default::default()
}
fn new_decimal128(
min: impl IntoIterator<Item = Option<i128>>,
max: impl IntoIterator<Item = Option<i128>>,
precision: u8,
scale: i8,
) -> Self {
Self::new()
.with_min(Arc::new(
min.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(precision, scale)
.unwrap(),
))
.with_max(Arc::new(
max.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(precision, scale)
.unwrap(),
))
}
fn new_i64(
min: impl IntoIterator<Item = Option<i64>>,
max: impl IntoIterator<Item = Option<i64>>,
) -> Self {
Self::new()
.with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
.with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
}
fn new_i32(
min: impl IntoIterator<Item = Option<i32>>,
max: impl IntoIterator<Item = Option<i32>>,
) -> Self {
Self::new()
.with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
.with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
}
fn new_utf8<'a>(
min: impl IntoIterator<Item = Option<&'a str>>,
max: impl IntoIterator<Item = Option<&'a str>>,
) -> Self {
Self::new()
.with_min(Arc::new(min.into_iter().collect::<StringArray>()))
.with_max(Arc::new(max.into_iter().collect::<StringArray>()))
}
fn new_bool(
min: impl IntoIterator<Item = Option<bool>>,
max: impl IntoIterator<Item = Option<bool>>,
) -> Self {
Self::new()
.with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
.with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
}
fn min(&self) -> Option<ArrayRef> {
self.min.clone()
}
fn max(&self) -> Option<ArrayRef> {
self.max.clone()
}
fn null_counts(&self) -> Option<ArrayRef> {
self.null_counts.clone()
}
fn row_counts(&self) -> Option<ArrayRef> {
self.row_counts.clone()
}
fn arrays(&self) -> Vec<ArrayRef> {
let contained_arrays = self
.contained
.iter()
.map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
[
self.min.as_ref().cloned(),
self.max.as_ref().cloned(),
self.null_counts.as_ref().cloned(),
self.row_counts.as_ref().cloned(),
]
.into_iter()
.flatten()
.chain(contained_arrays)
.collect()
}
fn len(&self) -> usize {
self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
}
fn assert_invariants(&self) {
let mut prev_len = None;
for len in self.arrays().iter().map(|a| a.len()) {
match prev_len {
None => {
prev_len = Some(len);
}
Some(prev_len) => {
assert_eq!(prev_len, len);
}
}
}
}
fn with_min(mut self, min: ArrayRef) -> Self {
self.min = Some(min);
self
}
fn with_max(mut self, max: ArrayRef) -> Self {
self.max = Some(max);
self
}
fn with_null_counts(
mut self,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let null_counts: ArrayRef =
Arc::new(counts.into_iter().collect::<UInt64Array>());
self.assert_invariants();
self.null_counts = Some(null_counts);
self
}
fn with_row_counts(
mut self,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let row_counts: ArrayRef =
Arc::new(counts.into_iter().collect::<UInt64Array>());
self.assert_invariants();
self.row_counts = Some(row_counts);
self
}
pub fn with_contained(
mut self,
values: impl IntoIterator<Item = ScalarValue>,
contained: impl IntoIterator<Item = Option<bool>>,
) -> Self {
let contained: BooleanArray = contained.into_iter().collect();
let values: HashSet<_> = values.into_iter().collect();
self.contained.push((values, contained));
self.assert_invariants();
self
}
fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
self.contained
.iter()
.find(|(values, _contained)| values == find_values)
.map(|(_values, contained)| contained.clone())
}
}
#[derive(Debug, Default)]
struct TestStatistics {
stats: HashMap<Column, ContainerStats>,
}
impl TestStatistics {
fn new() -> Self {
Self::default()
}
fn with(
mut self,
name: impl Into<String>,
container_stats: ContainerStats,
) -> Self {
let col = Column::from_name(name.into());
self.stats.insert(col, container_stats);
self
}
fn with_null_counts(
mut self,
name: impl Into<String>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let col = Column::from_name(name.into());
let container_stats = self
.stats
.remove(&col)
.unwrap_or_default()
.with_null_counts(counts);
self.stats.insert(col, container_stats);
self
}
fn with_row_counts(
mut self,
name: impl Into<String>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let col = Column::from_name(name.into());
let container_stats = self
.stats
.remove(&col)
.unwrap_or_default()
.with_row_counts(counts);
self.stats.insert(col, container_stats);
self
}
fn with_contained(
mut self,
name: impl Into<String>,
values: impl IntoIterator<Item = ScalarValue>,
contained: impl IntoIterator<Item = Option<bool>>,
) -> Self {
let col = Column::from_name(name.into());
let container_stats = self
.stats
.remove(&col)
.unwrap_or_default()
.with_contained(values, contained);
self.stats.insert(col, container_stats);
self
}
}
impl PruningStatistics for TestStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
self.stats
.get(column)
.map(|container_stats| container_stats.min())
.unwrap_or(None)
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.stats
.get(column)
.map(|container_stats| container_stats.max())
.unwrap_or(None)
}
fn num_containers(&self) -> usize {
self.stats
.values()
.next()
.map(|container_stats| container_stats.len())
.unwrap_or(0)
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
self.stats
.get(column)
.map(|container_stats| container_stats.null_counts())
.unwrap_or(None)
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
self.stats
.get(column)
.map(|container_stats| container_stats.row_counts())
.unwrap_or(None)
}
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
self.stats
.get(column)
.and_then(|container_stats| container_stats.contained(values))
}
}
struct OneContainerStats {
min_values: Option<ArrayRef>,
max_values: Option<ArrayRef>,
num_containers: usize,
}
impl PruningStatistics for OneContainerStats {
fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
self.min_values.clone()
}
fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
self.max_values.clone()
}
fn num_containers(&self) -> usize {
self.num_containers
}
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}
#[test]
fn prune_all_rows_null_counts() {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let statistics = TestStatistics::new().with(
"i",
ContainerStats::new_i32(
vec![Some(0)], vec![Some(0)], )
.with_null_counts(vec![Some(1)])
.with_row_counts(vec![Some(1)]),
);
let expected_ret = &[false];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let container_stats = ContainerStats {
min: Some(Arc::new(Int32Array::from(vec![None]))),
max: Some(Arc::new(Int32Array::from(vec![None]))),
null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
..ContainerStats::default()
};
let statistics = TestStatistics::new().with("i", container_stats);
let expected_ret = &[false];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let container_stats = ContainerStats {
min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
..ContainerStats::default()
};
let statistics = TestStatistics::new().with("i", container_stats);
let expected_ret = &[true];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
let expected_ret = &[false];
prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let container_stats = ContainerStats {
min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
..ContainerStats::default()
};
let statistics = TestStatistics::new().with("i", container_stats);
let expected_ret = &[true];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
let expected_ret = &[false];
prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
}
#[test]
fn prune_missing_statistics() {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let container_stats = ContainerStats {
min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
..ContainerStats::default()
};
let statistics = TestStatistics::new().with("i", container_stats);
let expected_ret = &[true, true];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
let expected_ret = &[false, true];
prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
let expected_ret = &[true, false];
prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
}
#[test]
fn prune_null_stats() {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let statistics = TestStatistics::new().with(
"i",
ContainerStats::new_i32(
vec![Some(0)], vec![Some(0)], )
.with_null_counts(vec![Some(1)])
.with_row_counts(vec![Some(1)]),
);
let expected_ret = &[false];
prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
}
#[test]
fn test_build_statistics_record_batch() {
let required_columns = RequiredColumns::from(vec![
(
phys_expr::Column::new("s1", 1),
StatisticsType::Min,
Field::new("s1_min", DataType::Int32, true),
),
(
phys_expr::Column::new("s2", 2),
StatisticsType::Max,
Field::new("s2_max", DataType::Int32, true),
),
(
phys_expr::Column::new("s3", 3),
StatisticsType::Max,
Field::new("s3_max", DataType::Utf8, true),
),
(
phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new("s3_min", DataType::Utf8, true),
),
]);
let statistics = TestStatistics::new()
.with(
"s1",
ContainerStats::new_i32(
vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
)
.with(
"s2",
ContainerStats::new_i32(
vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
)
.with(
"s3",
ContainerStats::new_utf8(
vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
);
let batch =
build_statistics_record_batch(&statistics, &required_columns).unwrap();
let expected = [
"+--------+--------+--------+--------+",
"| s1_min | s2_max | s3_max | s3_min |",
"+--------+--------+--------+--------+",
"| | 20 | q | a |",
"| | | | |",
"| 9 | | r | |",
"| | | | |",
"+--------+--------+--------+--------+",
];
assert_batches_eq!(expected, &[batch]);
}
#[test]
fn test_build_statistics_casting() {
let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new(
"s1_min",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
)]);
let statistics = OneContainerStats {
min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
num_containers: 1,
};
let batch =
build_statistics_record_batch(&statistics, &required_columns).unwrap();
let expected = [
"+-------------------------------+",
"| s1_min |",
"+-------------------------------+",
"| 1970-01-01T00:00:00.000000010 |",
"+-------------------------------+",
];
assert_batches_eq!(expected, &[batch]);
}
#[test]
fn test_build_statistics_no_required_stats() {
let required_columns = RequiredColumns::new();
let statistics = OneContainerStats {
min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
num_containers: 1,
};
let batch =
build_statistics_record_batch(&statistics, &required_columns).unwrap();
assert_eq!(batch.num_rows(), 1); }
#[test]
fn test_build_statistics_inconsistent_types() {
let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Utf8, true),
)]);
let statistics = OneContainerStats {
min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
max_values: None,
num_containers: 1,
};
let batch =
build_statistics_record_batch(&statistics, &required_columns).unwrap();
let expected = [
"+--------+",
"| s1_min |",
"+--------+",
"| |",
"+--------+",
];
assert_batches_eq!(expected, &[batch]);
}
#[test]
fn test_build_statistics_inconsistent_length() {
let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s1", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Int64, true),
)]);
let statistics = OneContainerStats {
min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
num_containers: 3,
};
let result =
build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
assert!(
result
.to_string()
.contains("mismatched statistics length. Expected 3, got 1"),
"{}",
result
);
}
#[test]
fn row_group_predicate_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr =
"c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
let expr = col("c1").eq(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).eq(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_not_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr =
"c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
let expr = col("c1").not_eq(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).not_eq(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_gt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 > 1";
let expr = col("c1").gt(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).lt(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_gt_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 >= 1";
let expr = col("c1").gt_eq(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).lt_eq(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_lt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1";
let expr = col("c1").lt(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).gt(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_lt_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 <= 1";
let expr = col("c1").lt_eq(lit(1));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(1).gt_eq(col("c1"));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_and() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
Field::new("c3", DataType::Int32, false),
]);
let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_or() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expected_expr = "true";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_not() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "true";
let expr = col("c1").not();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_not_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
let expected_expr = "NOT c1_min@0 AND c1_max@1";
let expr = col("c1").not();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
let expected_expr = "c1_min@0 OR c1_max@1";
let expr = col("c1");
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_lt_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < true";
let expr = col("c1").lt(lit(true));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_required_columns() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let mut required_columns = RequiredColumns::new();
let expr = col("c1")
.lt(lit(1))
.and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
let expected_expr = "c1_null_count@1 != c1_row_count@2 \
AND c1_min@0 < 1 AND (\
c2_null_count@5 != c2_row_count@6 \
AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR \
c2_null_count@5 != c2_row_count@6 AND c2_min@3 <= 3 AND 3 <= c2_max@4\
)";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut required_columns);
assert_eq!(predicate_expr.to_string(), expected_expr);
let c1_min_field = Field::new("c1_min", DataType::Int32, false);
assert_eq!(
required_columns.columns[0],
(
phys_expr::Column::new("c1", 0),
StatisticsType::Min,
c1_min_field.with_nullable(true) )
);
let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
assert_eq!(
required_columns.columns[1],
(
phys_expr::Column::new("c1", 0),
StatisticsType::NullCount,
c1_null_count_field.with_nullable(true) )
);
let c1_row_count_field = Field::new("c1_row_count", DataType::UInt64, false);
assert_eq!(
required_columns.columns[2],
(
phys_expr::Column::new("c1", 0),
StatisticsType::RowCount,
c1_row_count_field.with_nullable(true) )
);
let c2_min_field = Field::new("c2_min", DataType::Int32, false);
assert_eq!(
required_columns.columns[3],
(
phys_expr::Column::new("c2", 1),
StatisticsType::Min,
c2_min_field.with_nullable(true) )
);
let c2_max_field = Field::new("c2_max", DataType::Int32, false);
assert_eq!(
required_columns.columns[4],
(
phys_expr::Column::new("c2", 1),
StatisticsType::Max,
c2_max_field.with_nullable(true) )
);
let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
assert_eq!(
required_columns.columns[5],
(
phys_expr::Column::new("c2", 1),
StatisticsType::NullCount,
c2_null_count_field.with_nullable(true) )
);
let c2_row_count_field = Field::new("c2_row_count", DataType::UInt64, false);
assert_eq!(
required_columns.columns[6],
(
phys_expr::Column::new("c2", 1),
StatisticsType::RowCount,
c2_row_count_field.with_nullable(true) )
);
assert_eq!(required_columns.columns.len(), 7);
Ok(())
}
#[test]
fn row_group_predicate_in_list() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr = Expr::InList(InList::new(
Box::new(col("c1")),
vec![lit(1), lit(2), lit(3)],
false,
));
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_in_list_empty() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
let expected_expr = "true";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_in_list_negated() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr = Expr::InList(InList::new(
Box::new(col("c1")),
vec![lit(1), lit(2), lit(3)],
true,
));
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_between() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr1 = col("c1").between(lit(1), lit(5));
let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
let predicate_expr1 =
test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
let predicate_expr2 =
test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
Ok(())
}
#[test]
fn row_group_predicate_between_with_in_list() -> Result<()> {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
let expr2 = col("c2").between(lit(4), lit(5));
let expr3 = expr1.and(expr2);
let expected_expr = "(c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != c2_row_count@6 AND c2_max@4 >= 4 AND c2_null_count@5 != c2_row_count@6 AND c2_min@7 <= 5";
let predicate_expr =
test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_in_list_to_many_values() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
let expected_expr = "true";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_cast() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expected_expr =
"c1_null_count@1 != c1_row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
let expr =
try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr =
lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn row_group_predicate_cast_list() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expr = Expr::InList(InList::new(
Box::new(cast(col("c1"), DataType::Int64)),
vec![
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Int64(Some(2))),
lit(ScalarValue::Int64(Some(3))),
],
false,
));
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = Expr::InList(InList::new(
Box::new(cast(col("c1"), DataType::Int64)),
vec![
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Int64(Some(2))),
lit(ScalarValue::Int64(Some(3))),
],
true,
));
let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
#[test]
fn prune_decimal_data() {
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal128(9, 2),
true,
)]));
prune_with_expr(
col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
),
&[false, true, false, true],
);
prune_with_expr(
cast(col("s1"), DataType::Decimal128(14, 3))
.gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
),
&[false, true, false, true],
);
prune_with_expr(
try_cast(col("s1"), DataType::Decimal128(14, 3))
.gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
),
&[false, true, false, true],
);
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal128(18, 2),
true,
)]));
prune_with_expr(
col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_i64(
vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
),
&[false, true, false, true],
);
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal128(23, 2),
true,
)]));
prune_with_expr(
col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_decimal128(
vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
2,
),
),
&[false, true, false, true],
);
}
#[test]
fn prune_api() {
let schema = Arc::new(Schema::new(vec![
Field::new("s1", DataType::Utf8, true),
Field::new("s2", DataType::Int32, true),
]));
let statistics = TestStatistics::new().with(
"s2",
ContainerStats::new_i32(
vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
);
prune_with_expr(
col("s2").gt(lit(5)),
&schema,
&statistics,
&[false, true, true, true],
);
prune_with_expr(
cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
&schema,
&statistics,
&[false, true, true, true],
);
}
#[test]
fn prune_not_eq_data() {
let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
prune_with_expr(
col("s1").not_eq(lit("M")),
&schema,
&TestStatistics::new().with(
"s1",
ContainerStats::new_utf8(
vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
),
&[true, true, true, false, true, true],
);
}
fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
let schema =
Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
let statistics = TestStatistics::new().with(
"b1",
ContainerStats::new_bool(
vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
);
let expected_true = vec![false, true, true, true, true];
let expected_false = vec![true, true, false, true, true];
(schema, statistics, expected_true, expected_false)
}
#[test]
fn prune_bool_const_expr() {
let (schema, statistics, _, _) = bool_setup();
prune_with_expr(
lit(true),
&schema,
&statistics,
&[true, true, true, true, true],
);
prune_with_expr(
lit(false),
&schema,
&statistics,
&[true, true, true, true, true],
);
}
#[test]
fn prune_bool_column() {
let (schema, statistics, expected_true, _) = bool_setup();
prune_with_expr(
col("b1"),
&schema,
&statistics,
&expected_true,
);
}
#[test]
fn prune_bool_not_column() {
let (schema, statistics, _, expected_false) = bool_setup();
prune_with_expr(
col("b1").not(),
&schema,
&statistics,
&expected_false,
);
}
#[test]
fn prune_bool_column_eq_true() {
let (schema, statistics, expected_true, _) = bool_setup();
prune_with_expr(
col("b1").eq(lit(true)),
&schema,
&statistics,
&expected_true,
);
}
#[test]
fn prune_bool_not_column_eq_true() {
let (schema, statistics, _, expected_false) = bool_setup();
prune_with_expr(
col("b1").not().eq(lit(true)),
&schema,
&statistics,
&expected_false,
);
}
fn int32_setup() -> (SchemaRef, TestStatistics) {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let statistics = TestStatistics::new().with(
"i",
ContainerStats::new_i32(
vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
);
(schema, statistics)
}
#[test]
fn prune_int32_col_gt_zero() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, false, true, true];
prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
prune_with_expr(
Expr::Negative(Box::new(col("i"))).lt(lit(0)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_lte_zero() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, false, true, true, false];
prune_with_expr(
col("i").lt_eq(lit(0)),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_lte_zero_cast() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, true, true, true];
prune_with_expr(
cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_eq_zero() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, false, false, true, false];
prune_with_expr(
col("i").eq(lit(0)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_eq_zero_cast() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, false, false, true, false];
prune_with_expr(
cast(col("i"), DataType::Int64).eq(lit(0i64)),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_eq_zero_cast_as_str() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, true, true, true];
prune_with_expr(
cast(col("i"), DataType::Utf8).eq(lit("0")),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_col_lt_neg_one() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, false, true, true];
prune_with_expr(
col("i").gt(lit(-1)),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
Expr::Negative(Box::new(col("i"))).lt(lit(1)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_is_null() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, true, true, true];
prune_with_expr(
col("i").is_null(),
&schema,
&statistics,
expected_ret,
);
let statistics = statistics.with_null_counts(
"i",
vec![
Some(0), Some(1), None, None, Some(0), ],
);
let expected_ret = &[false, true, true, true, false];
prune_with_expr(
col("i").is_null(),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_int32_column_is_known_all_null() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, false, true, true, false];
prune_with_expr(
col("i").lt(lit(0)),
&schema,
&statistics,
expected_ret,
);
let statistics = statistics.with_row_counts(
"i",
vec![
Some(10), Some(9), None, Some(4),
Some(10),
],
);
prune_with_expr(
col("i").lt(lit(0)),
&schema,
&statistics,
expected_ret,
);
let statistics = statistics.with_null_counts(
"i",
vec![
Some(0), Some(1), None, Some(4), Some(0), ],
);
let expected_ret = &[true, false, true, false, false];
prune_with_expr(
col("i").lt(lit(0)),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn prune_cast_column_scalar() {
let (schema, statistics) = int32_setup();
let expected_ret = &[true, true, false, true, true];
prune_with_expr(
col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
&schema,
&statistics,
expected_ret,
);
prune_with_expr(
Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
.lt(lit(ScalarValue::Int64(Some(0)))),
&schema,
&statistics,
expected_ret,
);
}
#[test]
fn test_increment_utf8() {
assert_eq!(increment_utf8("abc").unwrap(), "abd");
assert_eq!(increment_utf8("abz").unwrap(), "ab{");
assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}");
assert_eq!(increment_utf8("ß").unwrap(), "à");
assert_eq!(increment_utf8("℣").unwrap(), "ℤ");
assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}");
assert!(increment_utf8("").is_none());
assert!(increment_utf8("\u{10FFFF}").is_none());
assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
assert!(increment_utf8("\u{D7FF}").is_none());
assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
assert!(increment_utf8("\u{FDCF}").is_none());
assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
assert!(increment_utf8("\u{10FFFF}").is_none()); }
fn utf8_setup() -> (SchemaRef, TestStatistics) {
let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_utf8(
vec![
Some("A"),
Some("A"),
Some("N"),
Some("M"),
None,
Some("A"),
Some(""),
Some(""),
Some("AB"),
Some("A\u{10ffff}\u{10ffff}"),
], vec![
Some("Z"),
Some("L"),
Some("Z"),
Some("M"),
None,
None,
Some("A"),
Some(""),
Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
Some("A\u{10ffff}\u{10ffff}"),
], ),
);
(schema, statistics)
}
#[test]
fn prune_utf8_eq() {
let (schema, statistics) = utf8_setup();
let expr = col("s1").eq(lit("A"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
false,
false,
true,
true,
true,
false,
false,
false,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").eq(lit(""));
#[rustfmt::skip]
let expected_ret = &[
false,
false,
false,
false,
true,
false,
true,
true,
false,
false,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
}
#[test]
fn prune_utf8_not_eq() {
let (schema, statistics) = utf8_setup();
let expr = col("s1").not_eq(lit("A"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").not_eq(lit(""));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
false,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
}
#[test]
fn prune_utf8_like_one() {
let (schema, statistics) = utf8_setup();
let expr = col("s1").like(lit("A_"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
false,
false,
true,
true,
true,
false,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit("_A_"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit("_"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit(""));
#[rustfmt::skip]
let expected_ret = &[
false,
false,
false,
false,
true,
false,
true,
true,
false,
false,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
}
#[test]
fn prune_utf8_like_many() {
let (schema, statistics) = utf8_setup();
let expr = col("s1").like(lit("A%"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
false,
false,
true,
true,
true,
false,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit("%A%"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit("%"));
#[rustfmt::skip]
let expected_ret = &[
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
let expr = col("s1").like(lit(""));
#[rustfmt::skip]
let expected_ret = &[
false,
false,
false,
false,
true,
false,
true,
true,
false,
false,
];
prune_with_expr(expr, &schema, &statistics, expected_ret);
}
#[test]
fn test_rewrite_expr_to_prunable() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let left_input = col("a");
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int32(Some(12)));
let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) = rewrite_expr_to_prunable(
&left_input,
Operator::Eq,
&right_input,
df_schema.clone(),
)
.unwrap();
assert_eq!(result_left.to_string(), left_input.to_string());
assert_eq!(result_right.to_string(), right_input.to_string());
let left_input = cast(col("a"), DataType::Decimal128(20, 3));
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) = rewrite_expr_to_prunable(
&left_input,
Operator::Gt,
&right_input,
df_schema.clone(),
)
.unwrap();
assert_eq!(result_left.to_string(), left_input.to_string());
assert_eq!(result_right.to_string(), right_input.to_string());
let left_input = try_cast(col("a"), DataType::Int64);
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) =
rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
.unwrap();
assert_eq!(result_left.to_string(), left_input.to_string());
assert_eq!(result_right.to_string(), right_input.to_string());
}
#[test]
fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
struct CustomUnhandledHook;
impl UnhandledPredicateHook for CustomUnhandledHook {
fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
}
}
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let schema_with_b = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let rewriter = PredicateRewriter::new()
.with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
let transform_expr = |expr| {
let expr = logical2physical(&expr, &schema_with_b);
rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
};
let known_expression = col("a").eq(lit(12));
let known_expression_transformed = PredicateRewriter::new()
.rewrite_predicate_to_statistics_predicate(
&logical2physical(&known_expression, &schema),
&schema,
);
let input = col("b").eq(lit(12));
let expected = logical2physical(&lit(42), &schema);
let transformed = transform_expr(input.clone());
assert_eq!(transformed.to_string(), expected.to_string());
let input = known_expression.clone().and(input.clone());
let expected = phys_expr::BinaryExpr::new(
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
let transformed = transform_expr(input.clone());
assert_eq!(transformed.to_string(), expected.to_string());
let input = array_has(make_array(vec![lit(1)]), col("a"));
let expected = logical2physical(&lit(42), &schema);
let transformed = transform_expr(input.clone());
assert_eq!(transformed.to_string(), expected.to_string());
let input = known_expression.and(input);
let expected = phys_expr::BinaryExpr::new(
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
let transformed = transform_expr(input.clone());
assert_eq!(transformed.to_string(), expected.to_string());
}
#[test]
fn test_rewrite_expr_to_prunable_error() {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let left_input = cast(col("a"), DataType::Int64);
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
let right_input = logical2physical(&right_input, &schema);
let result = rewrite_expr_to_prunable(
&left_input,
Operator::Gt,
&right_input,
df_schema.clone(),
);
assert!(result.is_err());
let left_input = is_null(col("a"));
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
let right_input = logical2physical(&right_input, &schema);
let result =
rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
assert!(result.is_err());
}
#[test]
fn prune_with_contained_one_column() {
let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
let statistics = TestStatistics::new()
.with_contained(
"s1",
[ScalarValue::from("foo")],
[
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
],
)
.with_contained(
"s1",
[ScalarValue::from("bar")],
[
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
],
)
.with_contained(
"s1",
[ScalarValue::from("foo"), ScalarValue::from("bar")],
[
None,
None,
None,
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
],
);
prune_with_expr(
col("s1").eq(lit("foo")),
&schema,
&statistics,
&[true, false, true, true, false, true, true, false, true],
);
prune_with_expr(
col("s1").eq(lit("bar")),
&schema,
&statistics,
&[true, true, true, false, false, false, true, true, true],
);
prune_with_expr(
col("s1").eq(lit("baz")),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, true, true, true, false, false, false],
);
prune_with_expr(
col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1")
.eq(lit("foo"))
.or(col("s1").eq(lit("bar")))
.or(col("s1").eq(lit("baz"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1").not_eq(lit("foo")),
&schema,
&statistics,
&[false, true, true, false, true, true, false, true, true],
);
prune_with_expr(
col("s1").not_eq(lit("bar")),
&schema,
&statistics,
&[false, false, false, true, true, true, true, true, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.and(col("s1").not_eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, false, false, false, true, true, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.and(col("s1").not_eq(lit("bar")))
.and(col("s1").not_eq(lit("baz"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.or(col("s1").not_eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.or(col("s1").not_eq(lit("bar")))
.or(col("s1").not_eq(lit("baz"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
}
#[test]
fn prune_with_contained_two_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("s1", DataType::Utf8, true),
Field::new("s2", DataType::Utf8, true),
]));
let statistics = TestStatistics::new()
.with_contained(
"s1",
[ScalarValue::from("foo")],
[
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
],
)
.with_contained(
"s2", [ScalarValue::from("bar")],
[
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
],
);
prune_with_expr(
col("s1").eq(lit("foo")),
&schema,
&statistics,
&[true, false, true, true, false, true, true, false, true],
);
let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
prune_with_expr(
expr,
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
&schema,
&statistics,
&[false, false, false, true, false, true, true, false, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.and(col("s2").not_eq(lit("bar"))),
&schema,
&statistics,
&[false, false, false, false, true, true, false, true, true],
);
prune_with_expr(
col("s1")
.not_eq(lit("foo"))
.and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
&schema,
&statistics,
&[false, true, true, false, true, true, false, true, true],
);
prune_with_expr(
col("s1").like(lit("foo%bar%")),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
prune_with_expr(
col("s1")
.like(lit("foo%bar%"))
.and(col("s2").eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, false, false, false, true, true, true],
);
prune_with_expr(
col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
}
#[test]
fn prune_with_range_and_contained() {
let schema = Arc::new(Schema::new(vec![
Field::new("i", DataType::Int32, true),
Field::new("s", DataType::Utf8, true),
]));
let statistics = TestStatistics::new()
.with(
"i",
ContainerStats::new_i32(
vec![
Some(-5),
Some(10),
None,
Some(-5),
Some(10),
None,
Some(-5),
Some(10),
None,
], vec![
Some(5),
Some(20),
None,
Some(5),
Some(20),
None,
Some(5),
Some(20),
None,
], ),
)
.with_contained(
"s",
[ScalarValue::from("foo")],
[
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
],
);
prune_with_expr(
col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
&schema,
&statistics,
&[true, false, true, false, false, false, true, false, true],
);
prune_with_expr(
col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
&schema,
&statistics,
&[false, false, false, true, false, true, true, false, true],
);
prune_with_expr(
col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
&schema,
&statistics,
&[true, true, true, true, true, true, true, true, true],
);
}
fn prune_with_expr(
expr: Expr,
schema: &SchemaRef,
statistics: &TestStatistics,
expected: &[bool],
) {
println!("Pruning with expr: {}", expr);
let expr = logical2physical(&expr, schema);
let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
let result = p.prune(statistics).unwrap();
assert_eq!(result, expected);
}
fn test_build_predicate_expression(
expr: &Expr,
schema: &Schema,
required_columns: &mut RequiredColumns,
) -> Arc<dyn PhysicalExpr> {
let expr = logical2physical(expr, schema);
let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
build_predicate_expression(&expr, schema, required_columns, &unhandled_hook)
}
}