datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{ready, Context, Poll};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::common::can_project;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
34};
35use crate::metrics::{MetricBuilder, MetricType};
36use crate::projection::{
37    make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
38    ProjectionExec, ProjectionExpr,
39};
40use crate::{
41    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
42    DisplayFormatType, ExecutionPlan,
43};
44
45use arrow::compute::filter_record_batch;
46use arrow::datatypes::{DataType, SchemaRef};
47use arrow::record_batch::RecordBatch;
48use datafusion_common::cast::as_boolean_array;
49use datafusion_common::config::ConfigOptions;
50use datafusion_common::stats::Precision;
51use datafusion_common::{
52    internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue,
53};
54use datafusion_execution::TaskContext;
55use datafusion_expr::Operator;
56use datafusion_physical_expr::equivalence::ProjectionMapping;
57use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column};
58use datafusion_physical_expr::intervals::utils::check_support;
59use datafusion_physical_expr::utils::collect_columns;
60use datafusion_physical_expr::{
61    analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext,
62    ConstExpr, ExprBoundaries, PhysicalExpr,
63};
64
65use datafusion_physical_expr_common::physical_expr::fmt_sql;
66use futures::stream::{Stream, StreamExt};
67use log::trace;
68
69const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
70
71/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
72/// include in its output batches.
73#[derive(Debug, Clone)]
74pub struct FilterExec {
75    /// The expression to filter on. This expression must evaluate to a boolean value.
76    predicate: Arc<dyn PhysicalExpr>,
77    /// The input plan
78    input: Arc<dyn ExecutionPlan>,
79    /// Execution metrics
80    metrics: ExecutionPlanMetricsSet,
81    /// Selectivity for statistics. 0 = no rows, 100 = all rows
82    default_selectivity: u8,
83    /// Properties equivalence properties, partitioning, etc.
84    cache: PlanProperties,
85    /// The projection indices of the columns in the output schema of join
86    projection: Option<Vec<usize>>,
87}
88
89impl FilterExec {
90    /// Create a FilterExec on an input
91    pub fn try_new(
92        predicate: Arc<dyn PhysicalExpr>,
93        input: Arc<dyn ExecutionPlan>,
94    ) -> Result<Self> {
95        match predicate.data_type(input.schema().as_ref())? {
96            DataType::Boolean => {
97                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
98                let cache = Self::compute_properties(
99                    &input,
100                    &predicate,
101                    default_selectivity,
102                    None,
103                )?;
104                Ok(Self {
105                    predicate,
106                    input: Arc::clone(&input),
107                    metrics: ExecutionPlanMetricsSet::new(),
108                    default_selectivity,
109                    cache,
110                    projection: None,
111                })
112            }
113            other => {
114                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
115            }
116        }
117    }
118
119    pub fn with_default_selectivity(
120        mut self,
121        default_selectivity: u8,
122    ) -> Result<Self, DataFusionError> {
123        if default_selectivity > 100 {
124            return plan_err!(
125                "Default filter selectivity value needs to be less than or equal to 100"
126            );
127        }
128        self.default_selectivity = default_selectivity;
129        Ok(self)
130    }
131
132    /// Return new instance of [FilterExec] with the given projection.
133    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
134        //  Check if the projection is valid
135        can_project(&self.schema(), projection.as_ref())?;
136
137        let projection = match projection {
138            Some(projection) => match &self.projection {
139                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
140                None => Some(projection),
141            },
142            None => None,
143        };
144
145        let cache = Self::compute_properties(
146            &self.input,
147            &self.predicate,
148            self.default_selectivity,
149            projection.as_ref(),
150        )?;
151        Ok(Self {
152            predicate: Arc::clone(&self.predicate),
153            input: Arc::clone(&self.input),
154            metrics: self.metrics.clone(),
155            default_selectivity: self.default_selectivity,
156            cache,
157            projection,
158        })
159    }
160
161    /// The expression to filter on. This expression must evaluate to a boolean value.
162    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
163        &self.predicate
164    }
165
166    /// The input plan
167    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
168        &self.input
169    }
170
171    /// The default selectivity
172    pub fn default_selectivity(&self) -> u8 {
173        self.default_selectivity
174    }
175
176    /// Projection
177    pub fn projection(&self) -> Option<&Vec<usize>> {
178        self.projection.as_ref()
179    }
180
181    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
182    fn statistics_helper(
183        schema: SchemaRef,
184        input_stats: Statistics,
185        predicate: &Arc<dyn PhysicalExpr>,
186        default_selectivity: u8,
187    ) -> Result<Statistics> {
188        if !check_support(predicate, &schema) {
189            let selectivity = default_selectivity as f64 / 100.0;
190            let mut stats = input_stats.to_inexact();
191            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
192            stats.total_byte_size = stats
193                .total_byte_size
194                .with_estimated_selectivity(selectivity);
195            return Ok(stats);
196        }
197
198        let num_rows = input_stats.num_rows;
199        let total_byte_size = input_stats.total_byte_size;
200        let input_analysis_ctx = AnalysisContext::try_from_statistics(
201            &schema,
202            &input_stats.column_statistics,
203        )?;
204
205        let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
206
207        // Estimate (inexact) selectivity of predicate
208        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
209        let num_rows = num_rows.with_estimated_selectivity(selectivity);
210        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
211
212        let column_statistics = collect_new_statistics(
213            &input_stats.column_statistics,
214            analysis_ctx.boundaries,
215        );
216        Ok(Statistics {
217            num_rows,
218            total_byte_size,
219            column_statistics,
220        })
221    }
222
223    fn extend_constants(
224        input: &Arc<dyn ExecutionPlan>,
225        predicate: &Arc<dyn PhysicalExpr>,
226    ) -> Vec<ConstExpr> {
227        let mut res_constants = Vec::new();
228        let input_eqs = input.equivalence_properties();
229
230        let conjunctions = split_conjunction(predicate);
231        for conjunction in conjunctions {
232            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
233                if binary.op() == &Operator::Eq {
234                    // Filter evaluates to single value for all partitions
235                    if input_eqs.is_expr_constant(binary.left()).is_some() {
236                        let across = input_eqs
237                            .is_expr_constant(binary.right())
238                            .unwrap_or_default();
239                        res_constants
240                            .push(ConstExpr::new(Arc::clone(binary.right()), across));
241                    } else if input_eqs.is_expr_constant(binary.right()).is_some() {
242                        let across = input_eqs
243                            .is_expr_constant(binary.left())
244                            .unwrap_or_default();
245                        res_constants
246                            .push(ConstExpr::new(Arc::clone(binary.left()), across));
247                    }
248                }
249            }
250        }
251        res_constants
252    }
253    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
254    fn compute_properties(
255        input: &Arc<dyn ExecutionPlan>,
256        predicate: &Arc<dyn PhysicalExpr>,
257        default_selectivity: u8,
258        projection: Option<&Vec<usize>>,
259    ) -> Result<PlanProperties> {
260        // Combine the equal predicates with the input equivalence properties
261        // to construct the equivalence properties:
262        let stats = Self::statistics_helper(
263            input.schema(),
264            input.partition_statistics(None)?,
265            predicate,
266            default_selectivity,
267        )?;
268        let mut eq_properties = input.equivalence_properties().clone();
269        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
270        for (lhs, rhs) in equal_pairs {
271            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
272        }
273        // Add the columns that have only one viable value (singleton) after
274        // filtering to constants.
275        let constants = collect_columns(predicate)
276            .into_iter()
277            .filter(|column| stats.column_statistics[column.index()].is_singleton())
278            .map(|column| {
279                let value = stats.column_statistics[column.index()]
280                    .min_value
281                    .get_value();
282                let expr = Arc::new(column) as _;
283                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
284            });
285        // This is for statistics
286        eq_properties.add_constants(constants)?;
287        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
288        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
289        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
290
291        let mut output_partitioning = input.output_partitioning().clone();
292        // If contains projection, update the PlanProperties.
293        if let Some(projection) = projection {
294            let schema = eq_properties.schema();
295            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
296            let out_schema = project_schema(schema, Some(projection))?;
297            output_partitioning =
298                output_partitioning.project(&projection_mapping, &eq_properties);
299            eq_properties = eq_properties.project(&projection_mapping, out_schema);
300        }
301
302        Ok(PlanProperties::new(
303            eq_properties,
304            output_partitioning,
305            input.pipeline_behavior(),
306            input.boundedness(),
307        ))
308    }
309}
310
311impl DisplayAs for FilterExec {
312    fn fmt_as(
313        &self,
314        t: DisplayFormatType,
315        f: &mut std::fmt::Formatter,
316    ) -> std::fmt::Result {
317        match t {
318            DisplayFormatType::Default | DisplayFormatType::Verbose => {
319                let display_projections = if let Some(projection) =
320                    self.projection.as_ref()
321                {
322                    format!(
323                        ", projection=[{}]",
324                        projection
325                            .iter()
326                            .map(|index| format!(
327                                "{}@{}",
328                                self.input.schema().fields().get(*index).unwrap().name(),
329                                index
330                            ))
331                            .collect::<Vec<_>>()
332                            .join(", ")
333                    )
334                } else {
335                    "".to_string()
336                };
337                write!(f, "FilterExec: {}{}", self.predicate, display_projections)
338            }
339            DisplayFormatType::TreeRender => {
340                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
341            }
342        }
343    }
344}
345
346impl ExecutionPlan for FilterExec {
347    fn name(&self) -> &'static str {
348        "FilterExec"
349    }
350
351    /// Return a reference to Any that can be used for downcasting
352    fn as_any(&self) -> &dyn Any {
353        self
354    }
355
356    fn properties(&self) -> &PlanProperties {
357        &self.cache
358    }
359
360    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
361        vec![&self.input]
362    }
363
364    fn maintains_input_order(&self) -> Vec<bool> {
365        // Tell optimizer this operator doesn't reorder its input
366        vec![true]
367    }
368
369    fn with_new_children(
370        self: Arc<Self>,
371        mut children: Vec<Arc<dyn ExecutionPlan>>,
372    ) -> Result<Arc<dyn ExecutionPlan>> {
373        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
374            .and_then(|e| {
375                let selectivity = e.default_selectivity();
376                e.with_default_selectivity(selectivity)
377            })
378            .and_then(|e| e.with_projection(self.projection().cloned()))
379            .map(|e| Arc::new(e) as _)
380    }
381
382    fn execute(
383        &self,
384        partition: usize,
385        context: Arc<TaskContext>,
386    ) -> Result<SendableRecordBatchStream> {
387        trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
388        let metrics = FilterExecMetrics::new(&self.metrics, partition);
389        Ok(Box::pin(FilterExecStream {
390            schema: self.schema(),
391            predicate: Arc::clone(&self.predicate),
392            input: self.input.execute(partition, context)?,
393            metrics,
394            projection: self.projection.clone(),
395        }))
396    }
397
398    fn metrics(&self) -> Option<MetricsSet> {
399        Some(self.metrics.clone_inner())
400    }
401
402    /// The output statistics of a filtering operation can be estimated if the
403    /// predicate's selectivity value can be determined for the incoming data.
404    fn statistics(&self) -> Result<Statistics> {
405        self.partition_statistics(None)
406    }
407
408    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
409        let input_stats = self.input.partition_statistics(partition)?;
410        let stats = Self::statistics_helper(
411            self.schema(),
412            input_stats,
413            self.predicate(),
414            self.default_selectivity,
415        )?;
416        Ok(stats.project(self.projection.as_ref()))
417    }
418
419    fn cardinality_effect(&self) -> CardinalityEffect {
420        CardinalityEffect::LowerEqual
421    }
422
423    /// Tries to swap `projection` with its input (`filter`). If possible, performs
424    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
425    fn try_swapping_with_projection(
426        &self,
427        projection: &ProjectionExec,
428    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
429        // If the projection does not narrow the schema, we should not try to push it down:
430        if projection.expr().len() < projection.input().schema().fields().len() {
431            // Each column in the predicate expression must exist after the projection.
432            if let Some(new_predicate) =
433                update_expr(self.predicate(), projection.expr(), false)?
434            {
435                return FilterExec::try_new(
436                    new_predicate,
437                    make_with_child(projection, self.input())?,
438                )
439                .and_then(|e| {
440                    let selectivity = self.default_selectivity();
441                    e.with_default_selectivity(selectivity)
442                })
443                .map(|e| Some(Arc::new(e) as _));
444            }
445        }
446        try_embed_projection(projection, self)
447    }
448
449    fn gather_filters_for_pushdown(
450        &self,
451        phase: FilterPushdownPhase,
452        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
453        _config: &ConfigOptions,
454    ) -> Result<FilterDescription> {
455        if !matches!(phase, FilterPushdownPhase::Pre) {
456            // For non-pre phase, filters pass through unchanged
457            let filter_supports = parent_filters
458                .into_iter()
459                .map(PushedDownPredicate::supported)
460                .collect();
461            return Ok(FilterDescription::new().with_child(ChildFilterDescription {
462                parent_filters: filter_supports,
463                self_filters: vec![],
464            }));
465        }
466
467        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
468            .with_self_filters(
469                split_conjunction(&self.predicate)
470                    .into_iter()
471                    .cloned()
472                    .collect(),
473            );
474
475        Ok(FilterDescription::new().with_child(child))
476    }
477
478    fn handle_child_pushdown_result(
479        &self,
480        phase: FilterPushdownPhase,
481        child_pushdown_result: ChildPushdownResult,
482        _config: &ConfigOptions,
483    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
484        if !matches!(phase, FilterPushdownPhase::Pre) {
485            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
486        }
487        // We absorb any parent filters that were not handled by our children
488        let unsupported_parent_filters =
489            child_pushdown_result.parent_filters.iter().filter_map(|f| {
490                matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
491            });
492        let unsupported_self_filters = child_pushdown_result
493            .self_filters
494            .first()
495            .expect("we have exactly one child")
496            .iter()
497            .filter_map(|f| match f.discriminant {
498                PushedDown::Yes => None,
499                PushedDown::No => Some(&f.predicate),
500            })
501            .cloned();
502
503        let unhandled_filters = unsupported_parent_filters
504            .into_iter()
505            .chain(unsupported_self_filters)
506            .collect_vec();
507
508        // If we have unhandled filters, we need to create a new FilterExec
509        let filter_input = Arc::clone(self.input());
510        let new_predicate = conjunction(unhandled_filters);
511        let updated_node = if new_predicate.eq(&lit(true)) {
512            // FilterExec is no longer needed, but we may need to leave a projection in place
513            match self.projection() {
514                Some(projection_indices) => {
515                    let filter_child_schema = filter_input.schema();
516                    let proj_exprs = projection_indices
517                        .iter()
518                        .map(|p| {
519                            let field = filter_child_schema.field(*p).clone();
520                            ProjectionExpr {
521                                expr: Arc::new(Column::new(field.name(), *p))
522                                    as Arc<dyn PhysicalExpr>,
523                                alias: field.name().to_string(),
524                            }
525                        })
526                        .collect::<Vec<_>>();
527                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
528                        as Arc<dyn ExecutionPlan>)
529                }
530                None => {
531                    // No projection needed, just return the input
532                    Some(filter_input)
533                }
534            }
535        } else if new_predicate.eq(&self.predicate) {
536            // The new predicate is the same as our current predicate
537            None
538        } else {
539            // Create a new FilterExec with the new predicate
540            let new = FilterExec {
541                predicate: Arc::clone(&new_predicate),
542                input: Arc::clone(&filter_input),
543                metrics: self.metrics.clone(),
544                default_selectivity: self.default_selectivity,
545                cache: Self::compute_properties(
546                    &filter_input,
547                    &new_predicate,
548                    self.default_selectivity,
549                    self.projection.as_ref(),
550                )?,
551                projection: None,
552            };
553            Some(Arc::new(new) as _)
554        };
555
556        Ok(FilterPushdownPropagation {
557            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
558            updated_node,
559        })
560    }
561}
562
563impl EmbeddedProjection for FilterExec {
564    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
565        self.with_projection(projection)
566    }
567}
568
569/// This function ensures that all bounds in the `ExprBoundaries` vector are
570/// converted to closed bounds. If a lower/upper bound is initially open, it
571/// is adjusted by using the next/previous value for its data type to convert
572/// it into a closed bound.
573fn collect_new_statistics(
574    input_column_stats: &[ColumnStatistics],
575    analysis_boundaries: Vec<ExprBoundaries>,
576) -> Vec<ColumnStatistics> {
577    analysis_boundaries
578        .into_iter()
579        .enumerate()
580        .map(
581            |(
582                idx,
583                ExprBoundaries {
584                    interval,
585                    distinct_count,
586                    ..
587                },
588            )| {
589                let Some(interval) = interval else {
590                    // If the interval is `None`, we can say that there are no rows:
591                    return ColumnStatistics {
592                        null_count: Precision::Exact(0),
593                        max_value: Precision::Exact(ScalarValue::Null),
594                        min_value: Precision::Exact(ScalarValue::Null),
595                        sum_value: Precision::Exact(ScalarValue::Null),
596                        distinct_count: Precision::Exact(0),
597                    };
598                };
599                let (lower, upper) = interval.into_bounds();
600                let (min_value, max_value) = if lower.eq(&upper) {
601                    (Precision::Exact(lower), Precision::Exact(upper))
602                } else {
603                    (Precision::Inexact(lower), Precision::Inexact(upper))
604                };
605                ColumnStatistics {
606                    null_count: input_column_stats[idx].null_count.to_inexact(),
607                    max_value,
608                    min_value,
609                    sum_value: Precision::Absent,
610                    distinct_count: distinct_count.to_inexact(),
611                }
612            },
613        )
614        .collect()
615}
616
617/// The FilterExec streams wraps the input iterator and applies the predicate expression to
618/// determine which rows to include in its output batches
619struct FilterExecStream {
620    /// Output schema after the projection
621    schema: SchemaRef,
622    /// The expression to filter on. This expression must evaluate to a boolean value.
623    predicate: Arc<dyn PhysicalExpr>,
624    /// The input partition to filter.
625    input: SendableRecordBatchStream,
626    /// Runtime metrics recording
627    metrics: FilterExecMetrics,
628    /// The projection indices of the columns in the input schema
629    projection: Option<Vec<usize>>,
630}
631
632/// The metrics for `FilterExec`
633struct FilterExecMetrics {
634    // Common metrics for most operators
635    baseline_metrics: BaselineMetrics,
636    // Selectivity of the filter, calculated as output_rows / input_rows
637    selectivity: RatioMetrics,
638}
639
640impl FilterExecMetrics {
641    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
642        Self {
643            baseline_metrics: BaselineMetrics::new(metrics, partition),
644            selectivity: MetricBuilder::new(metrics)
645                .with_type(MetricType::SUMMARY)
646                .ratio_metrics("selectivity", partition),
647        }
648    }
649}
650
651pub fn batch_filter(
652    batch: &RecordBatch,
653    predicate: &Arc<dyn PhysicalExpr>,
654) -> Result<RecordBatch> {
655    filter_and_project(batch, predicate, None, &batch.schema())
656}
657
658fn filter_and_project(
659    batch: &RecordBatch,
660    predicate: &Arc<dyn PhysicalExpr>,
661    projection: Option<&Vec<usize>>,
662    output_schema: &SchemaRef,
663) -> Result<RecordBatch> {
664    predicate
665        .evaluate(batch)
666        .and_then(|v| v.into_array(batch.num_rows()))
667        .and_then(|array| {
668            Ok(match (as_boolean_array(&array), projection) {
669                // Apply filter array to record batch
670                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
671                (Ok(filter_array), Some(projection)) => {
672                    let projected_columns = projection
673                        .iter()
674                        .map(|i| Arc::clone(batch.column(*i)))
675                        .collect();
676                    let projected_batch = RecordBatch::try_new(
677                        Arc::clone(output_schema),
678                        projected_columns,
679                    )?;
680                    filter_record_batch(&projected_batch, filter_array)?
681                }
682                (Err(_), _) => {
683                    return internal_err!(
684                        "Cannot create filter_array from non-boolean predicates"
685                    );
686                }
687            })
688        })
689}
690
691impl Stream for FilterExecStream {
692    type Item = Result<RecordBatch>;
693
694    fn poll_next(
695        mut self: Pin<&mut Self>,
696        cx: &mut Context<'_>,
697    ) -> Poll<Option<Self::Item>> {
698        let poll;
699        loop {
700            match ready!(self.input.poll_next_unpin(cx)) {
701                Some(Ok(batch)) => {
702                    let timer = self.metrics.baseline_metrics.elapsed_compute().timer();
703                    let filtered_batch = filter_and_project(
704                        &batch,
705                        &self.predicate,
706                        self.projection.as_ref(),
707                        &self.schema,
708                    )?;
709                    timer.done();
710
711                    self.metrics.selectivity.add_part(filtered_batch.num_rows());
712                    self.metrics.selectivity.add_total(batch.num_rows());
713
714                    // Skip entirely filtered batches
715                    if filtered_batch.num_rows() == 0 {
716                        continue;
717                    }
718                    poll = Poll::Ready(Some(Ok(filtered_batch)));
719                    break;
720                }
721                value => {
722                    poll = Poll::Ready(value);
723                    break;
724                }
725            }
726        }
727        self.metrics.baseline_metrics.record_poll(poll)
728    }
729
730    fn size_hint(&self) -> (usize, Option<usize>) {
731        // Same number of record batches
732        self.input.size_hint()
733    }
734}
735
736impl RecordBatchStream for FilterExecStream {
737    fn schema(&self) -> SchemaRef {
738        Arc::clone(&self.schema)
739    }
740}
741
742/// Return the equals Column-Pairs and Non-equals Column-Pairs
743#[deprecated(
744    since = "51.0.0",
745    note = "This function will be internal in the future"
746)]
747pub fn collect_columns_from_predicate(
748    predicate: &'_ Arc<dyn PhysicalExpr>,
749) -> EqualAndNonEqual<'_> {
750    collect_columns_from_predicate_inner(predicate)
751}
752
753fn collect_columns_from_predicate_inner(
754    predicate: &'_ Arc<dyn PhysicalExpr>,
755) -> EqualAndNonEqual<'_> {
756    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
757    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
758
759    let predicates = split_conjunction(predicate);
760    predicates.into_iter().for_each(|p| {
761        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
762            match binary.op() {
763                Operator::Eq => {
764                    eq_predicate_columns.push((binary.left(), binary.right()))
765                }
766                Operator::NotEq => {
767                    ne_predicate_columns.push((binary.left(), binary.right()))
768                }
769                _ => {}
770            }
771        }
772    });
773
774    (eq_predicate_columns, ne_predicate_columns)
775}
776
777/// Pair of `Arc<dyn PhysicalExpr>`s
778pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
779
780/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
781pub type EqualAndNonEqual<'a> =
782    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787    use crate::empty::EmptyExec;
788    use crate::expressions::*;
789    use crate::test;
790    use crate::test::exec::StatisticsExec;
791    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
792    use datafusion_common::ScalarValue;
793
794    #[tokio::test]
795    async fn collect_columns_predicates() -> Result<()> {
796        let schema = test::aggr_test_schema();
797        let predicate: Arc<dyn PhysicalExpr> = binary(
798            binary(
799                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
800                Operator::And,
801                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
802                &schema,
803            )?,
804            Operator::And,
805            binary(
806                binary(
807                    col("c2", &schema)?,
808                    Operator::Eq,
809                    col("c9", &schema)?,
810                    &schema,
811                )?,
812                Operator::And,
813                binary(
814                    col("c1", &schema)?,
815                    Operator::NotEq,
816                    col("c13", &schema)?,
817                    &schema,
818                )?,
819                &schema,
820            )?,
821            &schema,
822        )?;
823
824        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
825        assert_eq!(2, equal_pairs.len());
826        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
827        assert!(equal_pairs[0].1.eq(&lit(4u32)));
828
829        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
830        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
831
832        assert_eq!(1, ne_pairs.len());
833        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
834        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
835
836        Ok(())
837    }
838
839    #[tokio::test]
840    async fn test_filter_statistics_basic_expr() -> Result<()> {
841        // Table:
842        //      a: min=1, max=100
843        let bytes_per_row = 4;
844        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
845        let input = Arc::new(StatisticsExec::new(
846            Statistics {
847                num_rows: Precision::Inexact(100),
848                total_byte_size: Precision::Inexact(100 * bytes_per_row),
849                column_statistics: vec![ColumnStatistics {
850                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
851                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
852                    ..Default::default()
853                }],
854            },
855            schema.clone(),
856        ));
857
858        // a <= 25
859        let predicate: Arc<dyn PhysicalExpr> =
860            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
861
862        // WHERE a <= 25
863        let filter: Arc<dyn ExecutionPlan> =
864            Arc::new(FilterExec::try_new(predicate, input)?);
865
866        let statistics = filter.partition_statistics(None)?;
867        assert_eq!(statistics.num_rows, Precision::Inexact(25));
868        assert_eq!(
869            statistics.total_byte_size,
870            Precision::Inexact(25 * bytes_per_row)
871        );
872        assert_eq!(
873            statistics.column_statistics,
874            vec![ColumnStatistics {
875                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
876                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
877                ..Default::default()
878            }]
879        );
880
881        Ok(())
882    }
883
884    #[tokio::test]
885    async fn test_filter_statistics_column_level_nested() -> Result<()> {
886        // Table:
887        //      a: min=1, max=100
888        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
889        let input = Arc::new(StatisticsExec::new(
890            Statistics {
891                num_rows: Precision::Inexact(100),
892                column_statistics: vec![ColumnStatistics {
893                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
894                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
895                    ..Default::default()
896                }],
897                total_byte_size: Precision::Absent,
898            },
899            schema.clone(),
900        ));
901
902        // WHERE a <= 25
903        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
904            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
905            input,
906        )?);
907
908        // Nested filters (two separate physical plans, instead of AND chain in the expr)
909        // WHERE a >= 10
910        // WHERE a <= 25
911        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
912            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
913            sub_filter,
914        )?);
915
916        let statistics = filter.partition_statistics(None)?;
917        assert_eq!(statistics.num_rows, Precision::Inexact(16));
918        assert_eq!(
919            statistics.column_statistics,
920            vec![ColumnStatistics {
921                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
922                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
923                ..Default::default()
924            }]
925        );
926
927        Ok(())
928    }
929
930    #[tokio::test]
931    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
932        // Table:
933        //      a: min=1, max=100
934        //      b: min=1, max=50
935        let schema = Schema::new(vec![
936            Field::new("a", DataType::Int32, false),
937            Field::new("b", DataType::Int32, false),
938        ]);
939        let input = Arc::new(StatisticsExec::new(
940            Statistics {
941                num_rows: Precision::Inexact(100),
942                column_statistics: vec![
943                    ColumnStatistics {
944                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
945                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
946                        ..Default::default()
947                    },
948                    ColumnStatistics {
949                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
950                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
951                        ..Default::default()
952                    },
953                ],
954                total_byte_size: Precision::Absent,
955            },
956            schema.clone(),
957        ));
958
959        // WHERE a <= 25
960        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
961            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
962            input,
963        )?);
964
965        // WHERE b > 45
966        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
967            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
968            a_lte_25,
969        )?);
970
971        // WHERE a >= 10
972        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
973            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
974            b_gt_5,
975        )?);
976        let statistics = filter.partition_statistics(None)?;
977        // On a uniform distribution, only fifteen rows will satisfy the
978        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
979        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
980        //
981        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
982        // and that means about %1.5 of the all rows (rounded up to 2 rows).
983        assert_eq!(statistics.num_rows, Precision::Inexact(2));
984        assert_eq!(
985            statistics.column_statistics,
986            vec![
987                ColumnStatistics {
988                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
989                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
990                    ..Default::default()
991                },
992                ColumnStatistics {
993                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
994                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
995                    ..Default::default()
996                }
997            ]
998        );
999
1000        Ok(())
1001    }
1002
1003    #[tokio::test]
1004    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1005        // Table:
1006        //      a: min=???, max=??? (missing)
1007        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1008        let input = Arc::new(StatisticsExec::new(
1009            Statistics::new_unknown(&schema),
1010            schema.clone(),
1011        ));
1012
1013        // a <= 25
1014        let predicate: Arc<dyn PhysicalExpr> =
1015            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1016
1017        // WHERE a <= 25
1018        let filter: Arc<dyn ExecutionPlan> =
1019            Arc::new(FilterExec::try_new(predicate, input)?);
1020
1021        let statistics = filter.partition_statistics(None)?;
1022        assert_eq!(statistics.num_rows, Precision::Absent);
1023
1024        Ok(())
1025    }
1026
1027    #[tokio::test]
1028    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1029        // Table:
1030        //      a: min=1, max=100
1031        //      b: min=1, max=3
1032        //      c: min=1000.0  max=1100.0
1033        let schema = Schema::new(vec![
1034            Field::new("a", DataType::Int32, false),
1035            Field::new("b", DataType::Int32, false),
1036            Field::new("c", DataType::Float32, false),
1037        ]);
1038        let input = Arc::new(StatisticsExec::new(
1039            Statistics {
1040                num_rows: Precision::Inexact(1000),
1041                total_byte_size: Precision::Inexact(4000),
1042                column_statistics: vec![
1043                    ColumnStatistics {
1044                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1045                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1046                        ..Default::default()
1047                    },
1048                    ColumnStatistics {
1049                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1050                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1051                        ..Default::default()
1052                    },
1053                    ColumnStatistics {
1054                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1055                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1056                        ..Default::default()
1057                    },
1058                ],
1059            },
1060            schema,
1061        ));
1062        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1063        let predicate = Arc::new(BinaryExpr::new(
1064            Arc::new(BinaryExpr::new(
1065                Arc::new(Column::new("a", 0)),
1066                Operator::LtEq,
1067                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1068            )),
1069            Operator::And,
1070            Arc::new(BinaryExpr::new(
1071                Arc::new(BinaryExpr::new(
1072                    Arc::new(Column::new("b", 1)),
1073                    Operator::Eq,
1074                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1075                )),
1076                Operator::And,
1077                Arc::new(BinaryExpr::new(
1078                    Arc::new(BinaryExpr::new(
1079                        Arc::new(Column::new("c", 2)),
1080                        Operator::LtEq,
1081                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1082                    )),
1083                    Operator::And,
1084                    Arc::new(BinaryExpr::new(
1085                        Arc::new(Column::new("a", 0)),
1086                        Operator::Gt,
1087                        Arc::new(Column::new("b", 1)),
1088                    )),
1089                )),
1090            )),
1091        ));
1092        let filter: Arc<dyn ExecutionPlan> =
1093            Arc::new(FilterExec::try_new(predicate, input)?);
1094        let statistics = filter.partition_statistics(None)?;
1095        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1096        // num_rows after ceil => 133.0... => 134
1097        // total_byte_size after ceil => 532.0... => 533
1098        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1099        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1100        let exp_col_stats = vec![
1101            ColumnStatistics {
1102                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1103                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1104                ..Default::default()
1105            },
1106            ColumnStatistics {
1107                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1108                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1109                ..Default::default()
1110            },
1111            ColumnStatistics {
1112                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1113                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1114                ..Default::default()
1115            },
1116        ];
1117        let _ = exp_col_stats
1118            .into_iter()
1119            .zip(statistics.column_statistics)
1120            .map(|(expected, actual)| {
1121                if let Some(val) = actual.min_value.get_value() {
1122                    if val.data_type().is_floating() {
1123                        // Windows rounds arithmetic operation results differently for floating point numbers.
1124                        // Therefore, we check if the actual values are in an epsilon range.
1125                        let actual_min = actual.min_value.get_value().unwrap();
1126                        let actual_max = actual.max_value.get_value().unwrap();
1127                        let expected_min = expected.min_value.get_value().unwrap();
1128                        let expected_max = expected.max_value.get_value().unwrap();
1129                        let eps = ScalarValue::Float32(Some(1e-6));
1130
1131                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1132                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1133
1134                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1135                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1136                    } else {
1137                        assert_eq!(actual, expected);
1138                    }
1139                } else {
1140                    assert_eq!(actual, expected);
1141                }
1142            });
1143
1144        Ok(())
1145    }
1146
1147    #[tokio::test]
1148    async fn test_filter_statistics_full_selective() -> Result<()> {
1149        // Table:
1150        //      a: min=1, max=100
1151        //      b: min=1, max=3
1152        let schema = Schema::new(vec![
1153            Field::new("a", DataType::Int32, false),
1154            Field::new("b", DataType::Int32, false),
1155        ]);
1156        let input = Arc::new(StatisticsExec::new(
1157            Statistics {
1158                num_rows: Precision::Inexact(1000),
1159                total_byte_size: Precision::Inexact(4000),
1160                column_statistics: vec![
1161                    ColumnStatistics {
1162                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1163                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1164                        ..Default::default()
1165                    },
1166                    ColumnStatistics {
1167                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1168                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1169                        ..Default::default()
1170                    },
1171                ],
1172            },
1173            schema,
1174        ));
1175        // WHERE a<200 AND 1<=b
1176        let predicate = Arc::new(BinaryExpr::new(
1177            Arc::new(BinaryExpr::new(
1178                Arc::new(Column::new("a", 0)),
1179                Operator::Lt,
1180                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1181            )),
1182            Operator::And,
1183            Arc::new(BinaryExpr::new(
1184                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1185                Operator::LtEq,
1186                Arc::new(Column::new("b", 1)),
1187            )),
1188        ));
1189        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1190        let expected = input.partition_statistics(None)?.column_statistics;
1191        let filter: Arc<dyn ExecutionPlan> =
1192            Arc::new(FilterExec::try_new(predicate, input)?);
1193        let statistics = filter.partition_statistics(None)?;
1194
1195        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1196        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1197        assert_eq!(statistics.column_statistics, expected);
1198
1199        Ok(())
1200    }
1201
1202    #[tokio::test]
1203    async fn test_filter_statistics_zero_selective() -> Result<()> {
1204        // Table:
1205        //      a: min=1, max=100
1206        //      b: min=1, max=3
1207        let schema = Schema::new(vec![
1208            Field::new("a", DataType::Int32, false),
1209            Field::new("b", DataType::Int32, false),
1210        ]);
1211        let input = Arc::new(StatisticsExec::new(
1212            Statistics {
1213                num_rows: Precision::Inexact(1000),
1214                total_byte_size: Precision::Inexact(4000),
1215                column_statistics: vec![
1216                    ColumnStatistics {
1217                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1218                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1219                        ..Default::default()
1220                    },
1221                    ColumnStatistics {
1222                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1223                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1224                        ..Default::default()
1225                    },
1226                ],
1227            },
1228            schema,
1229        ));
1230        // WHERE a>200 AND 1<=b
1231        let predicate = Arc::new(BinaryExpr::new(
1232            Arc::new(BinaryExpr::new(
1233                Arc::new(Column::new("a", 0)),
1234                Operator::Gt,
1235                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1236            )),
1237            Operator::And,
1238            Arc::new(BinaryExpr::new(
1239                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1240                Operator::LtEq,
1241                Arc::new(Column::new("b", 1)),
1242            )),
1243        ));
1244        let filter: Arc<dyn ExecutionPlan> =
1245            Arc::new(FilterExec::try_new(predicate, input)?);
1246        let statistics = filter.partition_statistics(None)?;
1247
1248        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1249        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1250        assert_eq!(
1251            statistics.column_statistics,
1252            vec![
1253                ColumnStatistics {
1254                    min_value: Precision::Exact(ScalarValue::Null),
1255                    max_value: Precision::Exact(ScalarValue::Null),
1256                    sum_value: Precision::Exact(ScalarValue::Null),
1257                    distinct_count: Precision::Exact(0),
1258                    null_count: Precision::Exact(0),
1259                },
1260                ColumnStatistics {
1261                    min_value: Precision::Exact(ScalarValue::Null),
1262                    max_value: Precision::Exact(ScalarValue::Null),
1263                    sum_value: Precision::Exact(ScalarValue::Null),
1264                    distinct_count: Precision::Exact(0),
1265                    null_count: Precision::Exact(0),
1266                },
1267            ]
1268        );
1269
1270        Ok(())
1271    }
1272
1273    #[tokio::test]
1274    async fn test_filter_statistics_more_inputs() -> Result<()> {
1275        let schema = Schema::new(vec![
1276            Field::new("a", DataType::Int32, false),
1277            Field::new("b", DataType::Int32, false),
1278        ]);
1279        let input = Arc::new(StatisticsExec::new(
1280            Statistics {
1281                num_rows: Precision::Inexact(1000),
1282                total_byte_size: Precision::Inexact(4000),
1283                column_statistics: vec![
1284                    ColumnStatistics {
1285                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1286                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1287                        ..Default::default()
1288                    },
1289                    ColumnStatistics {
1290                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1291                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1292                        ..Default::default()
1293                    },
1294                ],
1295            },
1296            schema,
1297        ));
1298        // WHERE a<50
1299        let predicate = Arc::new(BinaryExpr::new(
1300            Arc::new(Column::new("a", 0)),
1301            Operator::Lt,
1302            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1303        ));
1304        let filter: Arc<dyn ExecutionPlan> =
1305            Arc::new(FilterExec::try_new(predicate, input)?);
1306        let statistics = filter.partition_statistics(None)?;
1307
1308        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1309        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1310        assert_eq!(
1311            statistics.column_statistics,
1312            vec![
1313                ColumnStatistics {
1314                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1315                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1316                    ..Default::default()
1317                },
1318                ColumnStatistics {
1319                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1320                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1321                    ..Default::default()
1322                },
1323            ]
1324        );
1325
1326        Ok(())
1327    }
1328
1329    #[tokio::test]
1330    async fn test_empty_input_statistics() -> Result<()> {
1331        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1332        let input = Arc::new(StatisticsExec::new(
1333            Statistics::new_unknown(&schema),
1334            schema,
1335        ));
1336        // WHERE a <= 10 AND 0 <= a - 5
1337        let predicate = Arc::new(BinaryExpr::new(
1338            Arc::new(BinaryExpr::new(
1339                Arc::new(Column::new("a", 0)),
1340                Operator::LtEq,
1341                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1342            )),
1343            Operator::And,
1344            Arc::new(BinaryExpr::new(
1345                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1346                Operator::LtEq,
1347                Arc::new(BinaryExpr::new(
1348                    Arc::new(Column::new("a", 0)),
1349                    Operator::Minus,
1350                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1351                )),
1352            )),
1353        ));
1354        let filter: Arc<dyn ExecutionPlan> =
1355            Arc::new(FilterExec::try_new(predicate, input)?);
1356        let filter_statistics = filter.partition_statistics(None)?;
1357
1358        let expected_filter_statistics = Statistics {
1359            num_rows: Precision::Absent,
1360            total_byte_size: Precision::Absent,
1361            column_statistics: vec![ColumnStatistics {
1362                null_count: Precision::Absent,
1363                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1364                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1365                sum_value: Precision::Absent,
1366                distinct_count: Precision::Absent,
1367            }],
1368        };
1369
1370        assert_eq!(filter_statistics, expected_filter_statistics);
1371
1372        Ok(())
1373    }
1374
1375    #[tokio::test]
1376    async fn test_statistics_with_constant_column() -> Result<()> {
1377        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1378        let input = Arc::new(StatisticsExec::new(
1379            Statistics::new_unknown(&schema),
1380            schema,
1381        ));
1382        // WHERE a = 10
1383        let predicate = Arc::new(BinaryExpr::new(
1384            Arc::new(Column::new("a", 0)),
1385            Operator::Eq,
1386            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1387        ));
1388        let filter: Arc<dyn ExecutionPlan> =
1389            Arc::new(FilterExec::try_new(predicate, input)?);
1390        let filter_statistics = filter.partition_statistics(None)?;
1391        // First column is "a", and it is a column with only one value after the filter.
1392        assert!(filter_statistics.column_statistics[0].is_singleton());
1393
1394        Ok(())
1395    }
1396
1397    #[tokio::test]
1398    async fn test_validation_filter_selectivity() -> Result<()> {
1399        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1400        let input = Arc::new(StatisticsExec::new(
1401            Statistics::new_unknown(&schema),
1402            schema,
1403        ));
1404        // WHERE a = 10
1405        let predicate = Arc::new(BinaryExpr::new(
1406            Arc::new(Column::new("a", 0)),
1407            Operator::Eq,
1408            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1409        ));
1410        let filter = FilterExec::try_new(predicate, input)?;
1411        assert!(filter.with_default_selectivity(120).is_err());
1412        Ok(())
1413    }
1414
1415    #[tokio::test]
1416    async fn test_custom_filter_selectivity() -> Result<()> {
1417        // Need a decimal to trigger inexact selectivity
1418        let schema =
1419            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1420        let input = Arc::new(StatisticsExec::new(
1421            Statistics {
1422                num_rows: Precision::Inexact(1000),
1423                total_byte_size: Precision::Inexact(4000),
1424                column_statistics: vec![ColumnStatistics {
1425                    ..Default::default()
1426                }],
1427            },
1428            schema,
1429        ));
1430        // WHERE a = 10
1431        let predicate = Arc::new(BinaryExpr::new(
1432            Arc::new(Column::new("a", 0)),
1433            Operator::Eq,
1434            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1435        ));
1436        let filter = FilterExec::try_new(predicate, input)?;
1437        let statistics = filter.partition_statistics(None)?;
1438        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1439        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1440        let filter = filter.with_default_selectivity(40)?;
1441        let statistics = filter.partition_statistics(None)?;
1442        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1443        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1444        Ok(())
1445    }
1446
1447    #[test]
1448    fn test_equivalence_properties_union_type() -> Result<()> {
1449        let union_type = DataType::Union(
1450            UnionFields::new(
1451                vec![0, 1],
1452                vec![
1453                    Field::new("f1", DataType::Int32, true),
1454                    Field::new("f2", DataType::Utf8, true),
1455                ],
1456            ),
1457            UnionMode::Sparse,
1458        );
1459
1460        let schema = Arc::new(Schema::new(vec![
1461            Field::new("c1", DataType::Int32, true),
1462            Field::new("c2", union_type, true),
1463        ]));
1464
1465        let exec = FilterExec::try_new(
1466            binary(
1467                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1468                Operator::And,
1469                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1470                &schema,
1471            )?,
1472            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1473        )?;
1474
1475        exec.partition_statistics(None).unwrap();
1476
1477        Ok(())
1478    }
1479}