use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
use itertools::Itertools;
use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::check_if_same_properties;
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDown,
};
use crate::metrics::{MetricBuilder, MetricType};
use crate::projection::{
EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
try_embed_projection, update_expr,
};
use crate::{
DisplayFormatType, ExecutionPlan,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
PhysicalExpr, analyze, conjunction, split_conjunction,
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use futures::stream::{Stream, StreamExt};
use log::trace;
const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
#[derive(Debug, Clone)]
pub struct FilterExec {
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
default_selectivity: u8,
cache: Arc<PlanProperties>,
projection: Option<ProjectionRef>,
batch_size: usize,
fetch: Option<usize>,
}
pub struct FilterExecBuilder {
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
projection: Option<ProjectionRef>,
default_selectivity: u8,
batch_size: usize,
fetch: Option<usize>,
}
impl FilterExecBuilder {
pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
Self {
predicate,
input,
projection: None,
default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
fetch: None,
}
}
pub fn with_input(mut self, input: Arc<dyn ExecutionPlan>) -> Self {
self.input = input;
self
}
pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = predicate;
self
}
pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> {
let projection = projection.map(Into::into);
self.apply_projection_by_ref(projection.as_ref())
}
pub fn apply_projection_by_ref(
mut self,
projection: Option<&ProjectionRef>,
) -> Result<Self> {
can_project(&self.input.schema(), projection.map(AsRef::as_ref))?;
self.projection = combine_projections(projection, self.projection.as_ref())?;
Ok(self)
}
pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
self.default_selectivity = default_selectivity;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}
pub fn build(self) -> Result<FilterExec> {
match self.predicate.data_type(self.input.schema().as_ref())? {
DataType::Boolean => {}
other => {
return plan_err!(
"Filter predicate must return BOOLEAN values, got {other:?}"
);
}
}
if self.default_selectivity > 100 {
return plan_err!(
"Default filter selectivity value needs to be less than or equal to 100"
);
}
can_project(&self.input.schema(), self.projection.as_deref())?;
let cache = FilterExec::compute_properties(
&self.input,
&self.predicate,
self.default_selectivity,
self.projection.as_deref(),
)?;
Ok(FilterExec {
predicate: self.predicate,
input: self.input,
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity: self.default_selectivity,
cache: Arc::new(cache),
projection: self.projection,
batch_size: self.batch_size,
fetch: self.fetch,
})
}
}
impl From<&FilterExec> for FilterExecBuilder {
fn from(exec: &FilterExec) -> Self {
Self {
predicate: Arc::clone(&exec.predicate),
input: Arc::clone(&exec.input),
projection: exec.projection.clone(),
default_selectivity: exec.default_selectivity,
batch_size: exec.batch_size,
fetch: exec.fetch,
}
}
}
impl FilterExec {
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
FilterExecBuilder::new(predicate, input).build()
}
pub fn batch_size(&self) -> usize {
self.batch_size
}
pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!(
"Default filter selectivity value needs to be less than or equal to 100"
);
}
self.default_selectivity = default_selectivity;
Ok(self)
}
#[deprecated(
since = "52.0.0",
note = "Use FilterExecBuilder::apply_projection instead"
)]
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
let builder = FilterExecBuilder::from(self);
builder.apply_projection(projection)?.build()
}
pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
Ok(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache: Arc::clone(&self.cache),
projection: self.projection.clone(),
batch_size,
fetch: self.fetch,
})
}
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn default_selectivity(&self) -> u8 {
self.default_selectivity
}
pub fn projection(&self) -> &Option<ProjectionRef> {
&self.projection
}
fn statistics_helper(
schema: &SchemaRef,
input_stats: Statistics,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
) -> Result<Statistics> {
if !check_support(predicate, schema) {
let selectivity = default_selectivity as f64 / 100.0;
let mut stats = input_stats.to_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
.total_byte_size
.with_estimated_selectivity(selectivity);
return Ok(stats);
}
let num_rows = input_stats.num_rows;
let total_byte_size = input_stats.total_byte_size;
let input_analysis_ctx =
AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
let num_rows = num_rows.with_estimated_selectivity(selectivity);
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
let column_statistics = collect_new_statistics(
schema,
&input_stats.column_statistics,
analysis_ctx.boundaries,
);
Ok(Statistics {
num_rows,
total_byte_size,
column_statistics,
})
}
fn expr_constant_or_literal(
expr: &Arc<dyn PhysicalExpr>,
input_eqs: &EquivalenceProperties,
) -> Option<AcrossPartitions> {
input_eqs.is_expr_constant(expr).or_else(|| {
expr.as_any()
.downcast_ref::<Literal>()
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
})
}
fn extend_constants(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
) -> Vec<ConstExpr> {
let mut res_constants = Vec::new();
let input_eqs = input.equivalence_properties();
let conjunctions = split_conjunction(predicate);
for conjunction in conjunctions {
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
&& binary.op() == &Operator::Eq
{
let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
let right_const =
Self::expr_constant_or_literal(binary.right(), input_eqs);
if let Some(left_across) = left_const {
let across = right_const.unwrap_or(left_across);
res_constants
.push(ConstExpr::new(Arc::clone(binary.right()), across));
} else if let Some(right_across) = right_const {
res_constants
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
}
}
}
res_constants
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
projection: Option<&[usize]>,
) -> Result<PlanProperties> {
let schema = input.schema();
let stats = Self::statistics_helper(
&schema,
input.partition_statistics(None)?,
predicate,
default_selectivity,
)?;
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
for (lhs, rhs) in equal_pairs {
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
}
let constants = collect_columns(predicate)
.into_iter()
.filter(|column| stats.column_statistics[column.index()].is_singleton())
.map(|column| {
let value = stats.column_statistics[column.index()]
.min_value
.get_value();
let expr = Arc::new(column) as _;
ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
});
eq_properties.add_constants(constants)?;
eq_properties.add_constants(Self::extend_constants(input, predicate))?;
let mut output_partitioning = input.output_partitioning().clone();
if let Some(projection) = projection {
let schema = eq_properties.schema();
let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
let out_schema = project_schema(schema, Some(&projection))?;
output_partitioning =
output_partitioning.project(&projection_mapping, &eq_properties);
eq_properties = eq_properties.project(&projection_mapping, out_schema);
}
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
))
}
fn with_new_children_and_same_properties(
&self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
Self {
input: children.swap_remove(0),
metrics: ExecutionPlanMetricsSet::new(),
..Self::clone(self)
}
}
}
impl DisplayAs for FilterExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_projections = if let Some(projection) =
self.projection.as_ref()
{
format!(
", projection=[{}]",
projection
.iter()
.map(|index| format!(
"{}@{}",
self.input.schema().fields().get(*index).unwrap().name(),
index
))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
let fetch = self
.fetch
.map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
write!(
f,
"FilterExec: {}{}{}",
self.predicate, display_projections, fetch
)
}
DisplayFormatType::TreeRender => {
write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
}
}
}
}
impl ExecutionPlan for FilterExec {
fn name(&self) -> &'static str {
"FilterExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
check_if_same_properties!(self, children);
let new_input = children.swap_remove(0);
FilterExecBuilder::from(&*self)
.with_input(new_input)
.build()
.map(|e| Arc::new(e) as _)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(
"Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
let metrics = FilterExecMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
metrics,
projection: self.projection.clone(),
batch_coalescer: LimitedBatchCoalescer::new(
self.schema(),
self.batch_size,
self.fetch,
),
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let input_stats = self.input.partition_statistics(partition)?;
let stats = Self::statistics_helper(
&self.input.schema(),
input_stats,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() < projection.input().schema().fields().len() {
if let Some(new_predicate) =
update_expr(self.predicate(), projection.expr(), false)?
{
return FilterExecBuilder::from(self)
.with_input(make_with_child(projection, self.input())?)
.with_predicate(new_predicate)
.apply_projection(None)?
.build()
.map(|e| Some(Arc::new(e) as _));
}
}
try_embed_projection(projection, self)
}
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Pre {
let child =
ChildFilterDescription::from_child(&parent_filters, self.input())?;
return Ok(FilterDescription::new().with_child(child));
}
let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
.with_self_filters(
split_conjunction(&self.predicate)
.into_iter()
.cloned()
.collect(),
);
Ok(FilterDescription::new().with_child(child))
}
fn handle_child_pushdown_result(
&self,
phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
if phase != FilterPushdownPhase::Pre {
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
child_pushdown_result
.parent_filters
.iter()
.filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
})
.collect();
if self.projection.is_some() {
let input_schema = self.input().schema();
unsupported_parent_filters = unsupported_parent_filters
.into_iter()
.map(|expr| reassign_expr_columns(expr, &input_schema))
.collect::<Result<Vec<_>>>()?;
}
let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
.expect("we have exactly one child")
.iter()
.filter_map(|f| match f.discriminant {
PushedDown::Yes => None,
PushedDown::No => Some(&f.predicate),
})
.cloned();
let unhandled_filters = unsupported_parent_filters
.into_iter()
.chain(unsupported_self_filters)
.collect_vec();
let filter_input = Arc::clone(self.input());
let new_predicate = conjunction(unhandled_filters);
let updated_node = if new_predicate.eq(&lit(true)) {
match self.projection().as_ref() {
Some(projection_indices) => {
let filter_child_schema = filter_input.schema();
let proj_exprs = projection_indices
.iter()
.map(|p| {
let field = filter_child_schema.field(*p).clone();
ProjectionExpr {
expr: Arc::new(Column::new(field.name(), *p))
as Arc<dyn PhysicalExpr>,
alias: field.name().to_string(),
}
})
.collect::<Vec<_>>();
Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
as Arc<dyn ExecutionPlan>)
}
None => {
Some(filter_input)
}
}
} else if new_predicate.eq(&self.predicate) {
None
} else {
let new = FilterExec {
predicate: Arc::clone(&new_predicate),
input: Arc::clone(&filter_input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache: Arc::new(Self::compute_properties(
&filter_input,
&new_predicate,
self.default_selectivity,
self.projection.as_deref(),
)?),
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
};
Some(Arc::new(new) as _)
};
Ok(FilterPushdownPropagation {
filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
updated_node,
})
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache: Arc::clone(&self.cache),
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch,
}))
}
fn with_preserve_order(
&self,
preserve_order: bool,
) -> Option<Arc<dyn ExecutionPlan>> {
self.input
.with_preserve_order(preserve_order)
.and_then(|new_input| {
Arc::new(self.clone())
.with_new_children(vec![new_input])
.ok()
})
}
}
impl EmbeddedProjection for FilterExec {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
FilterExecBuilder::from(self)
.apply_projection(projection)?
.build()
}
}
fn interval_bound_to_precision(
bound: ScalarValue,
is_exact: bool,
) -> Precision<ScalarValue> {
if bound.is_null() {
Precision::Absent
} else if is_exact {
Precision::Exact(bound)
} else {
Precision::Inexact(bound)
}
}
fn collect_new_statistics(
schema: &SchemaRef,
input_column_stats: &[ColumnStatistics],
analysis_boundaries: Vec<ExprBoundaries>,
) -> Vec<ColumnStatistics> {
analysis_boundaries
.into_iter()
.enumerate()
.map(
|(
idx,
ExprBoundaries {
interval,
distinct_count,
..
},
)| {
let Some(interval) = interval else {
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
.unwrap_or(ScalarValue::Null);
return ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(typed_null.clone()),
min_value: Precision::Exact(typed_null.clone()),
sum_value: Precision::Exact(typed_null),
distinct_count: Precision::Exact(0),
byte_size: input_column_stats[idx].byte_size,
};
};
let (lower, upper) = interval.into_bounds();
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
let min_value = interval_bound_to_precision(lower, is_exact);
let max_value = interval_bound_to_precision(upper, is_exact);
ColumnStatistics {
null_count: input_column_stats[idx].null_count.to_inexact(),
max_value,
min_value,
sum_value: Precision::Absent,
distinct_count: distinct_count.to_inexact(),
byte_size: input_column_stats[idx].byte_size,
}
},
)
.collect()
}
struct FilterExecStream {
schema: SchemaRef,
predicate: Arc<dyn PhysicalExpr>,
input: SendableRecordBatchStream,
metrics: FilterExecMetrics,
projection: Option<ProjectionRef>,
batch_coalescer: LimitedBatchCoalescer,
}
struct FilterExecMetrics {
baseline_metrics: BaselineMetrics,
selectivity: RatioMetrics,
}
impl FilterExecMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline_metrics: BaselineMetrics::new(metrics, partition),
selectivity: MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.ratio_metrics("selectivity", partition),
}
}
}
pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
) -> Result<RecordBatch> {
filter_and_project(batch, predicate, None)
}
fn filter_and_project(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
projection: Option<&Vec<usize>>,
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match (as_boolean_array(&array), projection) {
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_batch = batch.project(projection)?;
filter_record_batch(&projected_batch, filter_array)?
}
(Err(_), _) => {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
);
}
})
})
}
impl Stream for FilterExecStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
self.metrics.selectivity.add_part(batch.num_rows());
let poll = Poll::Ready(Some(Ok(batch)));
return self.metrics.baseline_metrics.record_poll(poll);
}
if self.batch_coalescer.is_finished() {
return Poll::Ready(None);
}
match ready!(self.input.poll_next_unpin(cx)) {
None => {
self.batch_coalescer.finish()?;
}
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
let status = self.predicate.as_ref()
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match self.projection.as_ref() {
Some(projection) => {
let projected_batch = batch.project(projection)?;
(array, projected_batch)
},
None => (array, batch)
})
}).and_then(|(array, batch)| {
match as_boolean_array(&array) {
Ok(filter_array) => {
self.metrics.selectivity.add_total(batch.num_rows());
let batch = filter_record_batch(&batch, filter_array)?;
let state = self.batch_coalescer.push_batch(batch)?;
Ok(state)
}
Err(_) => {
internal_err!(
"Cannot create filter_array from non-boolean predicates"
)
}
}
})?;
timer.done();
match status {
PushBatchStatus::Continue => {
}
PushBatchStatus::LimitReached => {
self.batch_coalescer.finish()?;
}
}
}
other => return Poll::Ready(other),
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
impl RecordBatchStream for FilterExecStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[deprecated(
since = "51.0.0",
note = "This function will be internal in the future"
)]
pub fn collect_columns_from_predicate(
predicate: &'_ Arc<dyn PhysicalExpr>,
) -> EqualAndNonEqual<'_> {
collect_columns_from_predicate_inner(predicate)
}
fn collect_columns_from_predicate_inner(
predicate: &'_ Arc<dyn PhysicalExpr>,
) -> EqualAndNonEqual<'_> {
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let predicates = split_conjunction(predicate);
predicates.into_iter().for_each(|p| {
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
let has_direct_column_operand =
binary.left().as_any().downcast_ref::<Column>().is_some()
|| binary.right().as_any().downcast_ref::<Column>().is_some();
if !has_direct_column_operand {
return;
}
match binary.op() {
Operator::Eq => {
eq_predicate_columns.push((binary.left(), binary.right()))
}
Operator::NotEq => {
ne_predicate_columns.push((binary.left(), binary.right()))
}
_ => {}
}
}
});
(eq_predicate_columns, ne_predicate_columns)
}
pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
pub type EqualAndNonEqual<'a> =
(Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
#[cfg(test)]
mod tests {
use super::*;
use crate::empty::EmptyExec;
use crate::expressions::*;
use crate::test;
use crate::test::exec::StatisticsExec;
use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
use datafusion_common::ScalarValue;
#[tokio::test]
async fn collect_columns_predicates() -> Result<()> {
let schema = test::aggr_test_schema();
let predicate: Arc<dyn PhysicalExpr> = binary(
binary(
binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
Operator::And,
binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
&schema,
)?,
Operator::And,
binary(
binary(
col("c2", &schema)?,
Operator::Eq,
col("c9", &schema)?,
&schema,
)?,
Operator::And,
binary(
col("c1", &schema)?,
Operator::NotEq,
col("c13", &schema)?,
&schema,
)?,
&schema,
)?,
&schema,
)?;
let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
assert_eq!(2, equal_pairs.len());
assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[0].1.eq(&lit(4u32)));
assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
assert_eq!(1, ne_pairs.len());
assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_basic_expr() -> Result<()> {
let bytes_per_row = 4;
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(100 * bytes_per_row),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
}],
},
schema.clone(),
));
let predicate: Arc<dyn PhysicalExpr> =
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(25));
assert_eq!(
statistics.total_byte_size,
Precision::Inexact(25 * bytes_per_row)
);
assert_eq!(
statistics.column_statistics,
vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
}]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_column_level_nested() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
}],
total_byte_size: Precision::Absent,
},
schema.clone(),
));
let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
sub_filter,
)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(16));
assert_eq!(
statistics.column_statistics,
vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
}]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
..Default::default()
},
],
total_byte_size: Precision::Absent,
},
schema.clone(),
));
let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);
let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
a_lte_25,
)?);
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
b_gt_5,
)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(2));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
..Default::default()
}
]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema.clone(),
));
let predicate: Arc<dyn PhysicalExpr> =
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Absent);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_multiple_columns() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Float32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Column::new("b", 1)),
)),
)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(134));
assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
let exp_col_stats = vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
..Default::default()
},
];
let _ = exp_col_stats
.into_iter()
.zip(statistics.column_statistics)
.map(|(expected, actual)| {
if let Some(val) = actual.min_value.get_value() {
if val.data_type().is_floating() {
let actual_min = actual.min_value.get_value().unwrap();
let actual_max = actual.max_value.get_value().unwrap();
let expected_min = expected.min_value.get_value().unwrap();
let expected_max = expected.max_value.get_value().unwrap();
let eps = ScalarValue::Float32(Some(1e-6));
assert!(actual_min.sub(expected_min).unwrap() < eps);
assert!(actual_min.sub(expected_min).unwrap() < eps);
assert!(actual_max.sub(expected_max).unwrap() < eps);
assert!(actual_max.sub(expected_max).unwrap() < eps);
} else {
assert_eq!(actual, expected);
}
} else {
assert_eq!(actual, expected);
}
});
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_full_selective() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::LtEq,
Arc::new(Column::new("b", 1)),
)),
));
let expected = input.partition_statistics(None)?.column_statistics;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(1000));
assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
assert_eq!(statistics.column_statistics, expected);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_zero_selective() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::LtEq,
Arc::new(Column::new("b", 1)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(0));
assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(None)),
max_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(None)),
max_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
},
]
);
Ok(())
}
#[tokio::test]
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
));
let inner_filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(inner_predicate, input)?);
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let outer_filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
let statistics = outer_filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(0));
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_more_inputs() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(490));
assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
]
);
Ok(())
}
#[tokio::test]
async fn test_empty_input_statistics() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
Operator::LtEq,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Minus,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let filter_statistics = filter.partition_statistics(None)?;
let expected_filter_statistics = Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics {
null_count: Precision::Absent,
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}],
};
assert_eq!(filter_statistics, expected_filter_statistics);
Ok(())
}
#[tokio::test]
async fn test_statistics_with_constant_column() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let filter_statistics = filter.partition_statistics(None)?;
assert!(filter_statistics.column_statistics[0].is_singleton());
Ok(())
}
#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}
#[tokio::test]
async fn test_custom_filter_selectivity() -> Result<()> {
let schema =
Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![ColumnStatistics {
..Default::default()
}],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
));
let filter = FilterExec::try_new(predicate, input)?;
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(200));
assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
let filter = filter.with_default_selectivity(40)?;
let statistics = filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(400));
assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
Ok(())
}
#[test]
fn test_equivalence_properties_union_type() -> Result<()> {
let union_type = DataType::Union(
UnionFields::try_new(
vec![0, 1],
vec![
Field::new("f1", DataType::Int32, true),
Field::new("f2", DataType::Utf8, true),
],
)
.unwrap(),
UnionMode::Sparse,
);
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", union_type, true),
]));
let exec = FilterExec::try_new(
binary(
binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
Operator::And,
binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
&schema,
)?,
Arc::new(EmptyExec::new(Arc::clone(&schema))),
)?;
exec.partition_statistics(None).unwrap();
Ok(())
}
#[tokio::test]
async fn test_builder_with_projection() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let projection = Some(vec![0, 2]);
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(projection.clone())
.unwrap()
.build()?;
assert_eq!(filter.projection(), &Some([0, 2].into()));
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 2);
assert_eq!(output_schema.field(0).name(), "a");
assert_eq!(output_schema.field(1).name(), "c");
Ok(())
}
#[tokio::test]
async fn test_builder_without_projection() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
));
let filter = FilterExecBuilder::new(predicate, input).build()?;
assert!(filter.projection().is_none());
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 2);
Ok(())
}
#[tokio::test]
async fn test_builder_invalid_projection() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
));
let result =
FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5]));
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_builder_vs_with_projection() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
..Default::default()
},
ColumnStatistics {
..Default::default()
},
ColumnStatistics {
..Default::default()
},
],
},
schema,
));
let input: Arc<dyn ExecutionPlan> = input;
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let projection = Some(vec![0, 2]);
let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(projection.clone())
.unwrap()
.build()?;
let filter2 = FilterExecBuilder::new(predicate, input)
.apply_projection(projection)
.unwrap()
.build()?;
assert_eq!(filter1.schema(), filter2.schema());
assert_eq!(filter1.projection(), filter2.projection());
let stats1 = filter1.partition_statistics(None)?;
let stats2 = filter2.partition_statistics(None)?;
assert_eq!(stats1.num_rows, stats2.num_rows);
assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
Ok(())
}
#[tokio::test]
async fn test_builder_statistics_with_projection() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(12000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0, 2]))
.unwrap()
.build()?;
let statistics = filter.partition_statistics(None)?;
assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
assert_eq!(filter.schema().fields().len(), 2);
Ok(())
}
#[test]
fn test_builder_predicate_validation() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let invalid_predicate = Arc::new(Column::new("a", 0));
let result = FilterExecBuilder::new(invalid_predicate, input)
.apply_projection(Some(vec![0]))
.unwrap()
.build();
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_builder_projection_composition() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0, 2, 3]))?
.apply_projection(Some(vec![0, 2]))?
.build()?;
assert_eq!(filter.projection(), &Some([0, 3].into()));
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 2);
assert_eq!(output_schema.field(0).name(), "a");
assert_eq!(output_schema.field(1).name(), "d");
Ok(())
}
#[tokio::test]
async fn test_builder_projection_composition_none_clears() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0]))?
.apply_projection(None)?
.build()?;
assert_eq!(filter.projection(), &None);
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 2);
Ok(())
}
#[test]
fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
));
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![2]))?
.build()?;
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 1);
assert_eq!(output_schema.field(0).name(), "c");
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
let config = ConfigOptions::new();
let desc = filter.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![parent_filter],
&config,
)?;
let parent_filters = desc.parent_filters();
assert_eq!(parent_filters.len(), 1); assert_eq!(parent_filters[0].len(), 1); let remapped = &parent_filters[0][0].predicate;
let display = format!("{remapped}");
assert_eq!(
display, "c@2",
"Post-phase parent filter column index must be remapped \
from output schema (c@0) to input schema (c@2)"
);
Ok(())
}
#[test]
fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
let schema = test::aggr_test_schema();
let complex_expr: Arc<dyn PhysicalExpr> = binary(
col("c2", &schema)?,
Operator::IsDistinctFrom,
lit(0u32),
&schema,
)?;
let predicate: Arc<dyn PhysicalExpr> =
binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
assert_eq!(
0,
equal_pairs.len(),
"Should not extract equality pairs where neither side is a Column"
);
let predicate: Arc<dyn PhysicalExpr> =
binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
assert_eq!(
1,
equal_pairs.len(),
"Should extract equality pairs where one side is a Column"
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::default(),
ColumnStatistics::default(),
],
},
schema.clone(),
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
let col_b_stats = &statistics.column_statistics[1];
assert_eq!(col_b_stats.min_value, Precision::Absent);
assert_eq!(col_b_stats.max_value, Precision::Absent);
Ok(())
}
#[test]
fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
use crate::projection::ProjectionExpr;
use datafusion_physical_expr::expressions::col;
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("tokens", DataType::Int64, false),
Field::new("svc", DataType::Utf8, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("ts", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
));
let filter = Arc::new(
FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0, 1, 2]))?
.build()?,
);
let proj_exprs = vec![
ProjectionExpr {
expr: col("ts", &filter.schema())?,
alias: "ts".to_string(),
},
ProjectionExpr {
expr: col("tokens", &filter.schema())?,
alias: "tokens".to_string(),
},
];
let projection = Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::clone(&filter) as _,
)?);
let result = filter.try_swapping_with_projection(&projection)?;
assert!(result.is_some(), "swap should succeed");
let new_plan = result.unwrap();
let out_schema = new_plan.schema();
assert_eq!(out_schema.fields().len(), 2);
assert_eq!(out_schema.field(0).name(), "ts");
assert_eq!(out_schema.field(1).name(), "tokens");
Ok(())
}
}