datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{ready, Context, Poll};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::common::can_project;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
34};
35use crate::projection::{
36    make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
37    ProjectionExec, ProjectionExpr,
38};
39use crate::{
40    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
41    DisplayFormatType, ExecutionPlan,
42};
43
44use arrow::compute::filter_record_batch;
45use arrow::datatypes::{DataType, SchemaRef};
46use arrow::record_batch::RecordBatch;
47use datafusion_common::cast::as_boolean_array;
48use datafusion_common::config::ConfigOptions;
49use datafusion_common::stats::Precision;
50use datafusion_common::{
51    internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue,
52};
53use datafusion_execution::TaskContext;
54use datafusion_expr::Operator;
55use datafusion_physical_expr::equivalence::ProjectionMapping;
56use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column};
57use datafusion_physical_expr::intervals::utils::check_support;
58use datafusion_physical_expr::utils::collect_columns;
59use datafusion_physical_expr::{
60    analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext,
61    ConstExpr, ExprBoundaries, PhysicalExpr,
62};
63
64use datafusion_physical_expr_common::physical_expr::fmt_sql;
65use futures::stream::{Stream, StreamExt};
66use log::trace;
67
68const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
69
70/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
71/// include in its output batches.
72#[derive(Debug, Clone)]
73pub struct FilterExec {
74    /// The expression to filter on. This expression must evaluate to a boolean value.
75    predicate: Arc<dyn PhysicalExpr>,
76    /// The input plan
77    input: Arc<dyn ExecutionPlan>,
78    /// Execution metrics
79    metrics: ExecutionPlanMetricsSet,
80    /// Selectivity for statistics. 0 = no rows, 100 = all rows
81    default_selectivity: u8,
82    /// Properties equivalence properties, partitioning, etc.
83    cache: PlanProperties,
84    /// The projection indices of the columns in the output schema of join
85    projection: Option<Vec<usize>>,
86}
87
88impl FilterExec {
89    /// Create a FilterExec on an input
90    pub fn try_new(
91        predicate: Arc<dyn PhysicalExpr>,
92        input: Arc<dyn ExecutionPlan>,
93    ) -> Result<Self> {
94        match predicate.data_type(input.schema().as_ref())? {
95            DataType::Boolean => {
96                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
97                let cache = Self::compute_properties(
98                    &input,
99                    &predicate,
100                    default_selectivity,
101                    None,
102                )?;
103                Ok(Self {
104                    predicate,
105                    input: Arc::clone(&input),
106                    metrics: ExecutionPlanMetricsSet::new(),
107                    default_selectivity,
108                    cache,
109                    projection: None,
110                })
111            }
112            other => {
113                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
114            }
115        }
116    }
117
118    pub fn with_default_selectivity(
119        mut self,
120        default_selectivity: u8,
121    ) -> Result<Self, DataFusionError> {
122        if default_selectivity > 100 {
123            return plan_err!(
124                "Default filter selectivity value needs to be less than or equal to 100"
125            );
126        }
127        self.default_selectivity = default_selectivity;
128        Ok(self)
129    }
130
131    /// Return new instance of [FilterExec] with the given projection.
132    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
133        //  Check if the projection is valid
134        can_project(&self.schema(), projection.as_ref())?;
135
136        let projection = match projection {
137            Some(projection) => match &self.projection {
138                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
139                None => Some(projection),
140            },
141            None => None,
142        };
143
144        let cache = Self::compute_properties(
145            &self.input,
146            &self.predicate,
147            self.default_selectivity,
148            projection.as_ref(),
149        )?;
150        Ok(Self {
151            predicate: Arc::clone(&self.predicate),
152            input: Arc::clone(&self.input),
153            metrics: self.metrics.clone(),
154            default_selectivity: self.default_selectivity,
155            cache,
156            projection,
157        })
158    }
159
160    /// The expression to filter on. This expression must evaluate to a boolean value.
161    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
162        &self.predicate
163    }
164
165    /// The input plan
166    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
167        &self.input
168    }
169
170    /// The default selectivity
171    pub fn default_selectivity(&self) -> u8 {
172        self.default_selectivity
173    }
174
175    /// Projection
176    pub fn projection(&self) -> Option<&Vec<usize>> {
177        self.projection.as_ref()
178    }
179
180    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
181    fn statistics_helper(
182        schema: SchemaRef,
183        input_stats: Statistics,
184        predicate: &Arc<dyn PhysicalExpr>,
185        default_selectivity: u8,
186    ) -> Result<Statistics> {
187        if !check_support(predicate, &schema) {
188            let selectivity = default_selectivity as f64 / 100.0;
189            let mut stats = input_stats.to_inexact();
190            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
191            stats.total_byte_size = stats
192                .total_byte_size
193                .with_estimated_selectivity(selectivity);
194            return Ok(stats);
195        }
196
197        let num_rows = input_stats.num_rows;
198        let total_byte_size = input_stats.total_byte_size;
199        let input_analysis_ctx = AnalysisContext::try_from_statistics(
200            &schema,
201            &input_stats.column_statistics,
202        )?;
203
204        let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
205
206        // Estimate (inexact) selectivity of predicate
207        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
208        let num_rows = num_rows.with_estimated_selectivity(selectivity);
209        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
210
211        let column_statistics = collect_new_statistics(
212            &input_stats.column_statistics,
213            analysis_ctx.boundaries,
214        );
215        Ok(Statistics {
216            num_rows,
217            total_byte_size,
218            column_statistics,
219        })
220    }
221
222    fn extend_constants(
223        input: &Arc<dyn ExecutionPlan>,
224        predicate: &Arc<dyn PhysicalExpr>,
225    ) -> Vec<ConstExpr> {
226        let mut res_constants = Vec::new();
227        let input_eqs = input.equivalence_properties();
228
229        let conjunctions = split_conjunction(predicate);
230        for conjunction in conjunctions {
231            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
232                if binary.op() == &Operator::Eq {
233                    // Filter evaluates to single value for all partitions
234                    if input_eqs.is_expr_constant(binary.left()).is_some() {
235                        let across = input_eqs
236                            .is_expr_constant(binary.right())
237                            .unwrap_or_default();
238                        res_constants
239                            .push(ConstExpr::new(Arc::clone(binary.right()), across));
240                    } else if input_eqs.is_expr_constant(binary.right()).is_some() {
241                        let across = input_eqs
242                            .is_expr_constant(binary.left())
243                            .unwrap_or_default();
244                        res_constants
245                            .push(ConstExpr::new(Arc::clone(binary.left()), across));
246                    }
247                }
248            }
249        }
250        res_constants
251    }
252    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
253    fn compute_properties(
254        input: &Arc<dyn ExecutionPlan>,
255        predicate: &Arc<dyn PhysicalExpr>,
256        default_selectivity: u8,
257        projection: Option<&Vec<usize>>,
258    ) -> Result<PlanProperties> {
259        // Combine the equal predicates with the input equivalence properties
260        // to construct the equivalence properties:
261        let stats = Self::statistics_helper(
262            input.schema(),
263            input.partition_statistics(None)?,
264            predicate,
265            default_selectivity,
266        )?;
267        let mut eq_properties = input.equivalence_properties().clone();
268        let (equal_pairs, _) = collect_columns_from_predicate(predicate);
269        for (lhs, rhs) in equal_pairs {
270            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
271        }
272        // Add the columns that have only one viable value (singleton) after
273        // filtering to constants.
274        let constants = collect_columns(predicate)
275            .into_iter()
276            .filter(|column| stats.column_statistics[column.index()].is_singleton())
277            .map(|column| {
278                let value = stats.column_statistics[column.index()]
279                    .min_value
280                    .get_value();
281                let expr = Arc::new(column) as _;
282                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
283            });
284        // This is for statistics
285        eq_properties.add_constants(constants)?;
286        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
287        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
288        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
289
290        let mut output_partitioning = input.output_partitioning().clone();
291        // If contains projection, update the PlanProperties.
292        if let Some(projection) = projection {
293            let schema = eq_properties.schema();
294            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
295            let out_schema = project_schema(schema, Some(projection))?;
296            output_partitioning =
297                output_partitioning.project(&projection_mapping, &eq_properties);
298            eq_properties = eq_properties.project(&projection_mapping, out_schema);
299        }
300
301        Ok(PlanProperties::new(
302            eq_properties,
303            output_partitioning,
304            input.pipeline_behavior(),
305            input.boundedness(),
306        ))
307    }
308}
309
310impl DisplayAs for FilterExec {
311    fn fmt_as(
312        &self,
313        t: DisplayFormatType,
314        f: &mut std::fmt::Formatter,
315    ) -> std::fmt::Result {
316        match t {
317            DisplayFormatType::Default | DisplayFormatType::Verbose => {
318                let display_projections = if let Some(projection) =
319                    self.projection.as_ref()
320                {
321                    format!(
322                        ", projection=[{}]",
323                        projection
324                            .iter()
325                            .map(|index| format!(
326                                "{}@{}",
327                                self.input.schema().fields().get(*index).unwrap().name(),
328                                index
329                            ))
330                            .collect::<Vec<_>>()
331                            .join(", ")
332                    )
333                } else {
334                    "".to_string()
335                };
336                write!(f, "FilterExec: {}{}", self.predicate, display_projections)
337            }
338            DisplayFormatType::TreeRender => {
339                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
340            }
341        }
342    }
343}
344
345impl ExecutionPlan for FilterExec {
346    fn name(&self) -> &'static str {
347        "FilterExec"
348    }
349
350    /// Return a reference to Any that can be used for downcasting
351    fn as_any(&self) -> &dyn Any {
352        self
353    }
354
355    fn properties(&self) -> &PlanProperties {
356        &self.cache
357    }
358
359    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
360        vec![&self.input]
361    }
362
363    fn maintains_input_order(&self) -> Vec<bool> {
364        // Tell optimizer this operator doesn't reorder its input
365        vec![true]
366    }
367
368    fn with_new_children(
369        self: Arc<Self>,
370        mut children: Vec<Arc<dyn ExecutionPlan>>,
371    ) -> Result<Arc<dyn ExecutionPlan>> {
372        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
373            .and_then(|e| {
374                let selectivity = e.default_selectivity();
375                e.with_default_selectivity(selectivity)
376            })
377            .and_then(|e| e.with_projection(self.projection().cloned()))
378            .map(|e| Arc::new(e) as _)
379    }
380
381    fn execute(
382        &self,
383        partition: usize,
384        context: Arc<TaskContext>,
385    ) -> Result<SendableRecordBatchStream> {
386        trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
387        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
388        Ok(Box::pin(FilterExecStream {
389            schema: self.schema(),
390            predicate: Arc::clone(&self.predicate),
391            input: self.input.execute(partition, context)?,
392            baseline_metrics,
393            projection: self.projection.clone(),
394        }))
395    }
396
397    fn metrics(&self) -> Option<MetricsSet> {
398        Some(self.metrics.clone_inner())
399    }
400
401    /// The output statistics of a filtering operation can be estimated if the
402    /// predicate's selectivity value can be determined for the incoming data.
403    fn statistics(&self) -> Result<Statistics> {
404        self.partition_statistics(None)
405    }
406
407    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
408        let input_stats = self.input.partition_statistics(partition)?;
409        let stats = Self::statistics_helper(
410            self.schema(),
411            input_stats,
412            self.predicate(),
413            self.default_selectivity,
414        )?;
415        Ok(stats.project(self.projection.as_ref()))
416    }
417
418    fn cardinality_effect(&self) -> CardinalityEffect {
419        CardinalityEffect::LowerEqual
420    }
421
422    /// Tries to swap `projection` with its input (`filter`). If possible, performs
423    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
424    fn try_swapping_with_projection(
425        &self,
426        projection: &ProjectionExec,
427    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
428        // If the projection does not narrow the schema, we should not try to push it down:
429        if projection.expr().len() < projection.input().schema().fields().len() {
430            // Each column in the predicate expression must exist after the projection.
431            if let Some(new_predicate) =
432                update_expr(self.predicate(), projection.expr(), false)?
433            {
434                return FilterExec::try_new(
435                    new_predicate,
436                    make_with_child(projection, self.input())?,
437                )
438                .and_then(|e| {
439                    let selectivity = self.default_selectivity();
440                    e.with_default_selectivity(selectivity)
441                })
442                .map(|e| Some(Arc::new(e) as _));
443            }
444        }
445        try_embed_projection(projection, self)
446    }
447
448    fn gather_filters_for_pushdown(
449        &self,
450        phase: FilterPushdownPhase,
451        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
452        _config: &ConfigOptions,
453    ) -> Result<FilterDescription> {
454        if !matches!(phase, FilterPushdownPhase::Pre) {
455            // For non-pre phase, filters pass through unchanged
456            let filter_supports = parent_filters
457                .into_iter()
458                .map(PushedDownPredicate::supported)
459                .collect();
460            return Ok(FilterDescription::new().with_child(ChildFilterDescription {
461                parent_filters: filter_supports,
462                self_filters: vec![],
463            }));
464        }
465
466        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
467            .with_self_filters(
468                split_conjunction(&self.predicate)
469                    .into_iter()
470                    .cloned()
471                    .collect(),
472            );
473
474        Ok(FilterDescription::new().with_child(child))
475    }
476
477    fn handle_child_pushdown_result(
478        &self,
479        phase: FilterPushdownPhase,
480        child_pushdown_result: ChildPushdownResult,
481        _config: &ConfigOptions,
482    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
483        if !matches!(phase, FilterPushdownPhase::Pre) {
484            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
485        }
486        // We absorb any parent filters that were not handled by our children
487        let unsupported_parent_filters =
488            child_pushdown_result.parent_filters.iter().filter_map(|f| {
489                matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
490            });
491        let unsupported_self_filters = child_pushdown_result
492            .self_filters
493            .first()
494            .expect("we have exactly one child")
495            .iter()
496            .filter_map(|f| match f.discriminant {
497                PushedDown::Yes => None,
498                PushedDown::No => Some(&f.predicate),
499            })
500            .cloned();
501
502        let unhandled_filters = unsupported_parent_filters
503            .into_iter()
504            .chain(unsupported_self_filters)
505            .collect_vec();
506
507        // If we have unhandled filters, we need to create a new FilterExec
508        let filter_input = Arc::clone(self.input());
509        let new_predicate = conjunction(unhandled_filters);
510        let updated_node = if new_predicate.eq(&lit(true)) {
511            // FilterExec is no longer needed, but we may need to leave a projection in place
512            match self.projection() {
513                Some(projection_indices) => {
514                    let filter_child_schema = filter_input.schema();
515                    let proj_exprs = projection_indices
516                        .iter()
517                        .map(|p| {
518                            let field = filter_child_schema.field(*p).clone();
519                            ProjectionExpr {
520                                expr: Arc::new(Column::new(field.name(), *p))
521                                    as Arc<dyn PhysicalExpr>,
522                                alias: field.name().to_string(),
523                            }
524                        })
525                        .collect::<Vec<_>>();
526                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
527                        as Arc<dyn ExecutionPlan>)
528                }
529                None => {
530                    // No projection needed, just return the input
531                    Some(filter_input)
532                }
533            }
534        } else if new_predicate.eq(&self.predicate) {
535            // The new predicate is the same as our current predicate
536            None
537        } else {
538            // Create a new FilterExec with the new predicate
539            let new = FilterExec {
540                predicate: Arc::clone(&new_predicate),
541                input: Arc::clone(&filter_input),
542                metrics: self.metrics.clone(),
543                default_selectivity: self.default_selectivity,
544                cache: Self::compute_properties(
545                    &filter_input,
546                    &new_predicate,
547                    self.default_selectivity,
548                    self.projection.as_ref(),
549                )?,
550                projection: None,
551            };
552            Some(Arc::new(new) as _)
553        };
554
555        Ok(FilterPushdownPropagation {
556            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
557            updated_node,
558        })
559    }
560}
561
562impl EmbeddedProjection for FilterExec {
563    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
564        self.with_projection(projection)
565    }
566}
567
568/// This function ensures that all bounds in the `ExprBoundaries` vector are
569/// converted to closed bounds. If a lower/upper bound is initially open, it
570/// is adjusted by using the next/previous value for its data type to convert
571/// it into a closed bound.
572fn collect_new_statistics(
573    input_column_stats: &[ColumnStatistics],
574    analysis_boundaries: Vec<ExprBoundaries>,
575) -> Vec<ColumnStatistics> {
576    analysis_boundaries
577        .into_iter()
578        .enumerate()
579        .map(
580            |(
581                idx,
582                ExprBoundaries {
583                    interval,
584                    distinct_count,
585                    ..
586                },
587            )| {
588                let Some(interval) = interval else {
589                    // If the interval is `None`, we can say that there are no rows:
590                    return ColumnStatistics {
591                        null_count: Precision::Exact(0),
592                        max_value: Precision::Exact(ScalarValue::Null),
593                        min_value: Precision::Exact(ScalarValue::Null),
594                        sum_value: Precision::Exact(ScalarValue::Null),
595                        distinct_count: Precision::Exact(0),
596                    };
597                };
598                let (lower, upper) = interval.into_bounds();
599                let (min_value, max_value) = if lower.eq(&upper) {
600                    (Precision::Exact(lower), Precision::Exact(upper))
601                } else {
602                    (Precision::Inexact(lower), Precision::Inexact(upper))
603                };
604                ColumnStatistics {
605                    null_count: input_column_stats[idx].null_count.to_inexact(),
606                    max_value,
607                    min_value,
608                    sum_value: Precision::Absent,
609                    distinct_count: distinct_count.to_inexact(),
610                }
611            },
612        )
613        .collect()
614}
615
616/// The FilterExec streams wraps the input iterator and applies the predicate expression to
617/// determine which rows to include in its output batches
618struct FilterExecStream {
619    /// Output schema after the projection
620    schema: SchemaRef,
621    /// The expression to filter on. This expression must evaluate to a boolean value.
622    predicate: Arc<dyn PhysicalExpr>,
623    /// The input partition to filter.
624    input: SendableRecordBatchStream,
625    /// Runtime metrics recording
626    baseline_metrics: BaselineMetrics,
627    /// The projection indices of the columns in the input schema
628    projection: Option<Vec<usize>>,
629}
630
631pub fn batch_filter(
632    batch: &RecordBatch,
633    predicate: &Arc<dyn PhysicalExpr>,
634) -> Result<RecordBatch> {
635    filter_and_project(batch, predicate, None, &batch.schema())
636}
637
638fn filter_and_project(
639    batch: &RecordBatch,
640    predicate: &Arc<dyn PhysicalExpr>,
641    projection: Option<&Vec<usize>>,
642    output_schema: &SchemaRef,
643) -> Result<RecordBatch> {
644    predicate
645        .evaluate(batch)
646        .and_then(|v| v.into_array(batch.num_rows()))
647        .and_then(|array| {
648            Ok(match (as_boolean_array(&array), projection) {
649                // Apply filter array to record batch
650                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
651                (Ok(filter_array), Some(projection)) => {
652                    let projected_columns = projection
653                        .iter()
654                        .map(|i| Arc::clone(batch.column(*i)))
655                        .collect();
656                    let projected_batch = RecordBatch::try_new(
657                        Arc::clone(output_schema),
658                        projected_columns,
659                    )?;
660                    filter_record_batch(&projected_batch, filter_array)?
661                }
662                (Err(_), _) => {
663                    return internal_err!(
664                        "Cannot create filter_array from non-boolean predicates"
665                    );
666                }
667            })
668        })
669}
670
671impl Stream for FilterExecStream {
672    type Item = Result<RecordBatch>;
673
674    fn poll_next(
675        mut self: Pin<&mut Self>,
676        cx: &mut Context<'_>,
677    ) -> Poll<Option<Self::Item>> {
678        let poll;
679        loop {
680            match ready!(self.input.poll_next_unpin(cx)) {
681                Some(Ok(batch)) => {
682                    let timer = self.baseline_metrics.elapsed_compute().timer();
683                    let filtered_batch = filter_and_project(
684                        &batch,
685                        &self.predicate,
686                        self.projection.as_ref(),
687                        &self.schema,
688                    )?;
689                    timer.done();
690                    // Skip entirely filtered batches
691                    if filtered_batch.num_rows() == 0 {
692                        continue;
693                    }
694                    poll = Poll::Ready(Some(Ok(filtered_batch)));
695                    break;
696                }
697                value => {
698                    poll = Poll::Ready(value);
699                    break;
700                }
701            }
702        }
703        self.baseline_metrics.record_poll(poll)
704    }
705
706    fn size_hint(&self) -> (usize, Option<usize>) {
707        // Same number of record batches
708        self.input.size_hint()
709    }
710}
711
712impl RecordBatchStream for FilterExecStream {
713    fn schema(&self) -> SchemaRef {
714        Arc::clone(&self.schema)
715    }
716}
717
718/// Return the equals Column-Pairs and Non-equals Column-Pairs
719pub fn collect_columns_from_predicate(
720    predicate: &'_ Arc<dyn PhysicalExpr>,
721) -> EqualAndNonEqual<'_> {
722    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
723    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
724
725    let predicates = split_conjunction(predicate);
726    predicates.into_iter().for_each(|p| {
727        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
728            match binary.op() {
729                Operator::Eq => {
730                    eq_predicate_columns.push((binary.left(), binary.right()))
731                }
732                Operator::NotEq => {
733                    ne_predicate_columns.push((binary.left(), binary.right()))
734                }
735                _ => {}
736            }
737        }
738    });
739
740    (eq_predicate_columns, ne_predicate_columns)
741}
742
743/// Pair of `Arc<dyn PhysicalExpr>`s
744pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
745
746/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
747pub type EqualAndNonEqual<'a> =
748    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use crate::empty::EmptyExec;
754    use crate::expressions::*;
755    use crate::test;
756    use crate::test::exec::StatisticsExec;
757    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
758    use datafusion_common::ScalarValue;
759
760    #[tokio::test]
761    async fn collect_columns_predicates() -> Result<()> {
762        let schema = test::aggr_test_schema();
763        let predicate: Arc<dyn PhysicalExpr> = binary(
764            binary(
765                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
766                Operator::And,
767                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
768                &schema,
769            )?,
770            Operator::And,
771            binary(
772                binary(
773                    col("c2", &schema)?,
774                    Operator::Eq,
775                    col("c9", &schema)?,
776                    &schema,
777                )?,
778                Operator::And,
779                binary(
780                    col("c1", &schema)?,
781                    Operator::NotEq,
782                    col("c13", &schema)?,
783                    &schema,
784                )?,
785                &schema,
786            )?,
787            &schema,
788        )?;
789
790        let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
791        assert_eq!(2, equal_pairs.len());
792        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
793        assert!(equal_pairs[0].1.eq(&lit(4u32)));
794
795        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
796        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
797
798        assert_eq!(1, ne_pairs.len());
799        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
800        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
801
802        Ok(())
803    }
804
805    #[tokio::test]
806    async fn test_filter_statistics_basic_expr() -> Result<()> {
807        // Table:
808        //      a: min=1, max=100
809        let bytes_per_row = 4;
810        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
811        let input = Arc::new(StatisticsExec::new(
812            Statistics {
813                num_rows: Precision::Inexact(100),
814                total_byte_size: Precision::Inexact(100 * bytes_per_row),
815                column_statistics: vec![ColumnStatistics {
816                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
817                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
818                    ..Default::default()
819                }],
820            },
821            schema.clone(),
822        ));
823
824        // a <= 25
825        let predicate: Arc<dyn PhysicalExpr> =
826            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
827
828        // WHERE a <= 25
829        let filter: Arc<dyn ExecutionPlan> =
830            Arc::new(FilterExec::try_new(predicate, input)?);
831
832        let statistics = filter.partition_statistics(None)?;
833        assert_eq!(statistics.num_rows, Precision::Inexact(25));
834        assert_eq!(
835            statistics.total_byte_size,
836            Precision::Inexact(25 * bytes_per_row)
837        );
838        assert_eq!(
839            statistics.column_statistics,
840            vec![ColumnStatistics {
841                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
842                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
843                ..Default::default()
844            }]
845        );
846
847        Ok(())
848    }
849
850    #[tokio::test]
851    async fn test_filter_statistics_column_level_nested() -> Result<()> {
852        // Table:
853        //      a: min=1, max=100
854        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
855        let input = Arc::new(StatisticsExec::new(
856            Statistics {
857                num_rows: Precision::Inexact(100),
858                column_statistics: vec![ColumnStatistics {
859                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
860                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
861                    ..Default::default()
862                }],
863                total_byte_size: Precision::Absent,
864            },
865            schema.clone(),
866        ));
867
868        // WHERE a <= 25
869        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
870            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
871            input,
872        )?);
873
874        // Nested filters (two separate physical plans, instead of AND chain in the expr)
875        // WHERE a >= 10
876        // WHERE a <= 25
877        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
878            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
879            sub_filter,
880        )?);
881
882        let statistics = filter.partition_statistics(None)?;
883        assert_eq!(statistics.num_rows, Precision::Inexact(16));
884        assert_eq!(
885            statistics.column_statistics,
886            vec![ColumnStatistics {
887                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
888                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
889                ..Default::default()
890            }]
891        );
892
893        Ok(())
894    }
895
896    #[tokio::test]
897    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
898        // Table:
899        //      a: min=1, max=100
900        //      b: min=1, max=50
901        let schema = Schema::new(vec![
902            Field::new("a", DataType::Int32, false),
903            Field::new("b", DataType::Int32, false),
904        ]);
905        let input = Arc::new(StatisticsExec::new(
906            Statistics {
907                num_rows: Precision::Inexact(100),
908                column_statistics: vec![
909                    ColumnStatistics {
910                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
911                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
912                        ..Default::default()
913                    },
914                    ColumnStatistics {
915                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
916                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
917                        ..Default::default()
918                    },
919                ],
920                total_byte_size: Precision::Absent,
921            },
922            schema.clone(),
923        ));
924
925        // WHERE a <= 25
926        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
927            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
928            input,
929        )?);
930
931        // WHERE b > 45
932        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
933            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
934            a_lte_25,
935        )?);
936
937        // WHERE a >= 10
938        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
939            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
940            b_gt_5,
941        )?);
942        let statistics = filter.partition_statistics(None)?;
943        // On a uniform distribution, only fifteen rows will satisfy the
944        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
945        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
946        //
947        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
948        // and that means about %1.5 of the all rows (rounded up to 2 rows).
949        assert_eq!(statistics.num_rows, Precision::Inexact(2));
950        assert_eq!(
951            statistics.column_statistics,
952            vec![
953                ColumnStatistics {
954                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
955                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
956                    ..Default::default()
957                },
958                ColumnStatistics {
959                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
960                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
961                    ..Default::default()
962                }
963            ]
964        );
965
966        Ok(())
967    }
968
969    #[tokio::test]
970    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
971        // Table:
972        //      a: min=???, max=??? (missing)
973        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
974        let input = Arc::new(StatisticsExec::new(
975            Statistics::new_unknown(&schema),
976            schema.clone(),
977        ));
978
979        // a <= 25
980        let predicate: Arc<dyn PhysicalExpr> =
981            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
982
983        // WHERE a <= 25
984        let filter: Arc<dyn ExecutionPlan> =
985            Arc::new(FilterExec::try_new(predicate, input)?);
986
987        let statistics = filter.partition_statistics(None)?;
988        assert_eq!(statistics.num_rows, Precision::Absent);
989
990        Ok(())
991    }
992
993    #[tokio::test]
994    async fn test_filter_statistics_multiple_columns() -> Result<()> {
995        // Table:
996        //      a: min=1, max=100
997        //      b: min=1, max=3
998        //      c: min=1000.0  max=1100.0
999        let schema = Schema::new(vec![
1000            Field::new("a", DataType::Int32, false),
1001            Field::new("b", DataType::Int32, false),
1002            Field::new("c", DataType::Float32, false),
1003        ]);
1004        let input = Arc::new(StatisticsExec::new(
1005            Statistics {
1006                num_rows: Precision::Inexact(1000),
1007                total_byte_size: Precision::Inexact(4000),
1008                column_statistics: vec![
1009                    ColumnStatistics {
1010                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1011                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1012                        ..Default::default()
1013                    },
1014                    ColumnStatistics {
1015                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1016                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1017                        ..Default::default()
1018                    },
1019                    ColumnStatistics {
1020                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1021                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1022                        ..Default::default()
1023                    },
1024                ],
1025            },
1026            schema,
1027        ));
1028        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1029        let predicate = Arc::new(BinaryExpr::new(
1030            Arc::new(BinaryExpr::new(
1031                Arc::new(Column::new("a", 0)),
1032                Operator::LtEq,
1033                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1034            )),
1035            Operator::And,
1036            Arc::new(BinaryExpr::new(
1037                Arc::new(BinaryExpr::new(
1038                    Arc::new(Column::new("b", 1)),
1039                    Operator::Eq,
1040                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1041                )),
1042                Operator::And,
1043                Arc::new(BinaryExpr::new(
1044                    Arc::new(BinaryExpr::new(
1045                        Arc::new(Column::new("c", 2)),
1046                        Operator::LtEq,
1047                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1048                    )),
1049                    Operator::And,
1050                    Arc::new(BinaryExpr::new(
1051                        Arc::new(Column::new("a", 0)),
1052                        Operator::Gt,
1053                        Arc::new(Column::new("b", 1)),
1054                    )),
1055                )),
1056            )),
1057        ));
1058        let filter: Arc<dyn ExecutionPlan> =
1059            Arc::new(FilterExec::try_new(predicate, input)?);
1060        let statistics = filter.partition_statistics(None)?;
1061        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1062        // num_rows after ceil => 133.0... => 134
1063        // total_byte_size after ceil => 532.0... => 533
1064        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1065        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1066        let exp_col_stats = vec![
1067            ColumnStatistics {
1068                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1069                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1070                ..Default::default()
1071            },
1072            ColumnStatistics {
1073                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1074                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1075                ..Default::default()
1076            },
1077            ColumnStatistics {
1078                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1079                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1080                ..Default::default()
1081            },
1082        ];
1083        let _ = exp_col_stats
1084            .into_iter()
1085            .zip(statistics.column_statistics)
1086            .map(|(expected, actual)| {
1087                if let Some(val) = actual.min_value.get_value() {
1088                    if val.data_type().is_floating() {
1089                        // Windows rounds arithmetic operation results differently for floating point numbers.
1090                        // Therefore, we check if the actual values are in an epsilon range.
1091                        let actual_min = actual.min_value.get_value().unwrap();
1092                        let actual_max = actual.max_value.get_value().unwrap();
1093                        let expected_min = expected.min_value.get_value().unwrap();
1094                        let expected_max = expected.max_value.get_value().unwrap();
1095                        let eps = ScalarValue::Float32(Some(1e-6));
1096
1097                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1098                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1099
1100                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1101                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1102                    } else {
1103                        assert_eq!(actual, expected);
1104                    }
1105                } else {
1106                    assert_eq!(actual, expected);
1107                }
1108            });
1109
1110        Ok(())
1111    }
1112
1113    #[tokio::test]
1114    async fn test_filter_statistics_full_selective() -> Result<()> {
1115        // Table:
1116        //      a: min=1, max=100
1117        //      b: min=1, max=3
1118        let schema = Schema::new(vec![
1119            Field::new("a", DataType::Int32, false),
1120            Field::new("b", DataType::Int32, false),
1121        ]);
1122        let input = Arc::new(StatisticsExec::new(
1123            Statistics {
1124                num_rows: Precision::Inexact(1000),
1125                total_byte_size: Precision::Inexact(4000),
1126                column_statistics: vec![
1127                    ColumnStatistics {
1128                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1129                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1130                        ..Default::default()
1131                    },
1132                    ColumnStatistics {
1133                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1134                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1135                        ..Default::default()
1136                    },
1137                ],
1138            },
1139            schema,
1140        ));
1141        // WHERE a<200 AND 1<=b
1142        let predicate = Arc::new(BinaryExpr::new(
1143            Arc::new(BinaryExpr::new(
1144                Arc::new(Column::new("a", 0)),
1145                Operator::Lt,
1146                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1147            )),
1148            Operator::And,
1149            Arc::new(BinaryExpr::new(
1150                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1151                Operator::LtEq,
1152                Arc::new(Column::new("b", 1)),
1153            )),
1154        ));
1155        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1156        let expected = input.partition_statistics(None)?.column_statistics;
1157        let filter: Arc<dyn ExecutionPlan> =
1158            Arc::new(FilterExec::try_new(predicate, input)?);
1159        let statistics = filter.partition_statistics(None)?;
1160
1161        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1162        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1163        assert_eq!(statistics.column_statistics, expected);
1164
1165        Ok(())
1166    }
1167
1168    #[tokio::test]
1169    async fn test_filter_statistics_zero_selective() -> Result<()> {
1170        // Table:
1171        //      a: min=1, max=100
1172        //      b: min=1, max=3
1173        let schema = Schema::new(vec![
1174            Field::new("a", DataType::Int32, false),
1175            Field::new("b", DataType::Int32, false),
1176        ]);
1177        let input = Arc::new(StatisticsExec::new(
1178            Statistics {
1179                num_rows: Precision::Inexact(1000),
1180                total_byte_size: Precision::Inexact(4000),
1181                column_statistics: vec![
1182                    ColumnStatistics {
1183                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1184                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1185                        ..Default::default()
1186                    },
1187                    ColumnStatistics {
1188                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1189                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1190                        ..Default::default()
1191                    },
1192                ],
1193            },
1194            schema,
1195        ));
1196        // WHERE a>200 AND 1<=b
1197        let predicate = Arc::new(BinaryExpr::new(
1198            Arc::new(BinaryExpr::new(
1199                Arc::new(Column::new("a", 0)),
1200                Operator::Gt,
1201                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1202            )),
1203            Operator::And,
1204            Arc::new(BinaryExpr::new(
1205                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1206                Operator::LtEq,
1207                Arc::new(Column::new("b", 1)),
1208            )),
1209        ));
1210        let filter: Arc<dyn ExecutionPlan> =
1211            Arc::new(FilterExec::try_new(predicate, input)?);
1212        let statistics = filter.partition_statistics(None)?;
1213
1214        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1215        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1216        assert_eq!(
1217            statistics.column_statistics,
1218            vec![
1219                ColumnStatistics {
1220                    min_value: Precision::Exact(ScalarValue::Null),
1221                    max_value: Precision::Exact(ScalarValue::Null),
1222                    sum_value: Precision::Exact(ScalarValue::Null),
1223                    distinct_count: Precision::Exact(0),
1224                    null_count: Precision::Exact(0),
1225                },
1226                ColumnStatistics {
1227                    min_value: Precision::Exact(ScalarValue::Null),
1228                    max_value: Precision::Exact(ScalarValue::Null),
1229                    sum_value: Precision::Exact(ScalarValue::Null),
1230                    distinct_count: Precision::Exact(0),
1231                    null_count: Precision::Exact(0),
1232                },
1233            ]
1234        );
1235
1236        Ok(())
1237    }
1238
1239    #[tokio::test]
1240    async fn test_filter_statistics_more_inputs() -> Result<()> {
1241        let schema = Schema::new(vec![
1242            Field::new("a", DataType::Int32, false),
1243            Field::new("b", DataType::Int32, false),
1244        ]);
1245        let input = Arc::new(StatisticsExec::new(
1246            Statistics {
1247                num_rows: Precision::Inexact(1000),
1248                total_byte_size: Precision::Inexact(4000),
1249                column_statistics: vec![
1250                    ColumnStatistics {
1251                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1252                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1253                        ..Default::default()
1254                    },
1255                    ColumnStatistics {
1256                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1257                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1258                        ..Default::default()
1259                    },
1260                ],
1261            },
1262            schema,
1263        ));
1264        // WHERE a<50
1265        let predicate = Arc::new(BinaryExpr::new(
1266            Arc::new(Column::new("a", 0)),
1267            Operator::Lt,
1268            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1269        ));
1270        let filter: Arc<dyn ExecutionPlan> =
1271            Arc::new(FilterExec::try_new(predicate, input)?);
1272        let statistics = filter.partition_statistics(None)?;
1273
1274        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1275        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1276        assert_eq!(
1277            statistics.column_statistics,
1278            vec![
1279                ColumnStatistics {
1280                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1281                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1282                    ..Default::default()
1283                },
1284                ColumnStatistics {
1285                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1286                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1287                    ..Default::default()
1288                },
1289            ]
1290        );
1291
1292        Ok(())
1293    }
1294
1295    #[tokio::test]
1296    async fn test_empty_input_statistics() -> Result<()> {
1297        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1298        let input = Arc::new(StatisticsExec::new(
1299            Statistics::new_unknown(&schema),
1300            schema,
1301        ));
1302        // WHERE a <= 10 AND 0 <= a - 5
1303        let predicate = Arc::new(BinaryExpr::new(
1304            Arc::new(BinaryExpr::new(
1305                Arc::new(Column::new("a", 0)),
1306                Operator::LtEq,
1307                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1308            )),
1309            Operator::And,
1310            Arc::new(BinaryExpr::new(
1311                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1312                Operator::LtEq,
1313                Arc::new(BinaryExpr::new(
1314                    Arc::new(Column::new("a", 0)),
1315                    Operator::Minus,
1316                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1317                )),
1318            )),
1319        ));
1320        let filter: Arc<dyn ExecutionPlan> =
1321            Arc::new(FilterExec::try_new(predicate, input)?);
1322        let filter_statistics = filter.partition_statistics(None)?;
1323
1324        let expected_filter_statistics = Statistics {
1325            num_rows: Precision::Absent,
1326            total_byte_size: Precision::Absent,
1327            column_statistics: vec![ColumnStatistics {
1328                null_count: Precision::Absent,
1329                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1330                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1331                sum_value: Precision::Absent,
1332                distinct_count: Precision::Absent,
1333            }],
1334        };
1335
1336        assert_eq!(filter_statistics, expected_filter_statistics);
1337
1338        Ok(())
1339    }
1340
1341    #[tokio::test]
1342    async fn test_statistics_with_constant_column() -> Result<()> {
1343        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1344        let input = Arc::new(StatisticsExec::new(
1345            Statistics::new_unknown(&schema),
1346            schema,
1347        ));
1348        // WHERE a = 10
1349        let predicate = Arc::new(BinaryExpr::new(
1350            Arc::new(Column::new("a", 0)),
1351            Operator::Eq,
1352            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1353        ));
1354        let filter: Arc<dyn ExecutionPlan> =
1355            Arc::new(FilterExec::try_new(predicate, input)?);
1356        let filter_statistics = filter.partition_statistics(None)?;
1357        // First column is "a", and it is a column with only one value after the filter.
1358        assert!(filter_statistics.column_statistics[0].is_singleton());
1359
1360        Ok(())
1361    }
1362
1363    #[tokio::test]
1364    async fn test_validation_filter_selectivity() -> Result<()> {
1365        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1366        let input = Arc::new(StatisticsExec::new(
1367            Statistics::new_unknown(&schema),
1368            schema,
1369        ));
1370        // WHERE a = 10
1371        let predicate = Arc::new(BinaryExpr::new(
1372            Arc::new(Column::new("a", 0)),
1373            Operator::Eq,
1374            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1375        ));
1376        let filter = FilterExec::try_new(predicate, input)?;
1377        assert!(filter.with_default_selectivity(120).is_err());
1378        Ok(())
1379    }
1380
1381    #[tokio::test]
1382    async fn test_custom_filter_selectivity() -> Result<()> {
1383        // Need a decimal to trigger inexact selectivity
1384        let schema =
1385            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1386        let input = Arc::new(StatisticsExec::new(
1387            Statistics {
1388                num_rows: Precision::Inexact(1000),
1389                total_byte_size: Precision::Inexact(4000),
1390                column_statistics: vec![ColumnStatistics {
1391                    ..Default::default()
1392                }],
1393            },
1394            schema,
1395        ));
1396        // WHERE a = 10
1397        let predicate = Arc::new(BinaryExpr::new(
1398            Arc::new(Column::new("a", 0)),
1399            Operator::Eq,
1400            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1401        ));
1402        let filter = FilterExec::try_new(predicate, input)?;
1403        let statistics = filter.partition_statistics(None)?;
1404        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1405        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1406        let filter = filter.with_default_selectivity(40)?;
1407        let statistics = filter.partition_statistics(None)?;
1408        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1409        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1410        Ok(())
1411    }
1412
1413    #[test]
1414    fn test_equivalence_properties_union_type() -> Result<()> {
1415        let union_type = DataType::Union(
1416            UnionFields::new(
1417                vec![0, 1],
1418                vec![
1419                    Field::new("f1", DataType::Int32, true),
1420                    Field::new("f2", DataType::Utf8, true),
1421                ],
1422            ),
1423            UnionMode::Sparse,
1424        );
1425
1426        let schema = Arc::new(Schema::new(vec![
1427            Field::new("c1", DataType::Int32, true),
1428            Field::new("c2", union_type, true),
1429        ]));
1430
1431        let exec = FilterExec::try_new(
1432            binary(
1433                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1434                Operator::And,
1435                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1436                &schema,
1437            )?,
1438            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1439        )?;
1440
1441        exec.partition_statistics(None).unwrap();
1442
1443        Ok(())
1444    }
1445}