datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::LimitedBatchCoalescer;
30use crate::coalesce::PushBatchStatus::LimitReached;
31use crate::common::can_project;
32use crate::execution_plan::CardinalityEffect;
33use crate::filter_pushdown::{
34    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
35    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
36};
37use crate::metrics::{MetricBuilder, MetricType};
38use crate::projection::{
39    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
40    try_embed_projection, update_expr,
41};
42use crate::{
43    DisplayFormatType, ExecutionPlan,
44    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
45};
46
47use arrow::compute::filter_record_batch;
48use arrow::datatypes::{DataType, SchemaRef};
49use arrow::record_batch::RecordBatch;
50use datafusion_common::cast::as_boolean_array;
51use datafusion_common::config::ConfigOptions;
52use datafusion_common::stats::Precision;
53use datafusion_common::{
54    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
55};
56use datafusion_execution::TaskContext;
57use datafusion_expr::Operator;
58use datafusion_physical_expr::equivalence::ProjectionMapping;
59use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
60use datafusion_physical_expr::intervals::utils::check_support;
61use datafusion_physical_expr::utils::collect_columns;
62use datafusion_physical_expr::{
63    AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
64    conjunction, split_conjunction,
65};
66
67use datafusion_physical_expr_common::physical_expr::fmt_sql;
68use futures::stream::{Stream, StreamExt};
69use log::trace;
70
71const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
72const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
73
74/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
75/// include in its output batches.
76#[derive(Debug, Clone)]
77pub struct FilterExec {
78    /// The expression to filter on. This expression must evaluate to a boolean value.
79    predicate: Arc<dyn PhysicalExpr>,
80    /// The input plan
81    input: Arc<dyn ExecutionPlan>,
82    /// Execution metrics
83    metrics: ExecutionPlanMetricsSet,
84    /// Selectivity for statistics. 0 = no rows, 100 = all rows
85    default_selectivity: u8,
86    /// Properties equivalence properties, partitioning, etc.
87    cache: PlanProperties,
88    /// The projection indices of the columns in the output schema of join
89    projection: Option<Vec<usize>>,
90    /// Target batch size for output batches
91    batch_size: usize,
92    /// Number of rows to fetch
93    fetch: Option<usize>,
94}
95
96impl FilterExec {
97    /// Create a FilterExec on an input
98    #[expect(clippy::needless_pass_by_value)]
99    pub fn try_new(
100        predicate: Arc<dyn PhysicalExpr>,
101        input: Arc<dyn ExecutionPlan>,
102    ) -> Result<Self> {
103        match predicate.data_type(input.schema().as_ref())? {
104            DataType::Boolean => {
105                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
106                let cache = Self::compute_properties(
107                    &input,
108                    &predicate,
109                    default_selectivity,
110                    None,
111                )?;
112                Ok(Self {
113                    predicate,
114                    input: Arc::clone(&input),
115                    metrics: ExecutionPlanMetricsSet::new(),
116                    default_selectivity,
117                    cache,
118                    projection: None,
119                    batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
120                    fetch: None,
121                })
122            }
123            other => {
124                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
125            }
126        }
127    }
128
129    pub fn with_default_selectivity(
130        mut self,
131        default_selectivity: u8,
132    ) -> Result<Self, DataFusionError> {
133        if default_selectivity > 100 {
134            return plan_err!(
135                "Default filter selectivity value needs to be less than or equal to 100"
136            );
137        }
138        self.default_selectivity = default_selectivity;
139        Ok(self)
140    }
141
142    /// Return new instance of [FilterExec] with the given projection.
143    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
144        //  Check if the projection is valid
145        can_project(&self.schema(), projection.as_ref())?;
146
147        let projection = match projection {
148            Some(projection) => match &self.projection {
149                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
150                None => Some(projection),
151            },
152            None => None,
153        };
154
155        let cache = Self::compute_properties(
156            &self.input,
157            &self.predicate,
158            self.default_selectivity,
159            projection.as_ref(),
160        )?;
161        Ok(Self {
162            predicate: Arc::clone(&self.predicate),
163            input: Arc::clone(&self.input),
164            metrics: self.metrics.clone(),
165            default_selectivity: self.default_selectivity,
166            cache,
167            projection,
168            batch_size: self.batch_size,
169            fetch: self.fetch,
170        })
171    }
172
173    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
174        Ok(Self {
175            predicate: Arc::clone(&self.predicate),
176            input: Arc::clone(&self.input),
177            metrics: self.metrics.clone(),
178            default_selectivity: self.default_selectivity,
179            cache: self.cache.clone(),
180            projection: self.projection.clone(),
181            batch_size,
182            fetch: self.fetch,
183        })
184    }
185
186    /// The expression to filter on. This expression must evaluate to a boolean value.
187    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
188        &self.predicate
189    }
190
191    /// The input plan
192    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
193        &self.input
194    }
195
196    /// The default selectivity
197    pub fn default_selectivity(&self) -> u8 {
198        self.default_selectivity
199    }
200
201    /// Projection
202    pub fn projection(&self) -> Option<&Vec<usize>> {
203        self.projection.as_ref()
204    }
205
206    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
207    fn statistics_helper(
208        schema: &SchemaRef,
209        input_stats: Statistics,
210        predicate: &Arc<dyn PhysicalExpr>,
211        default_selectivity: u8,
212    ) -> Result<Statistics> {
213        if !check_support(predicate, schema) {
214            let selectivity = default_selectivity as f64 / 100.0;
215            let mut stats = input_stats.to_inexact();
216            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
217            stats.total_byte_size = stats
218                .total_byte_size
219                .with_estimated_selectivity(selectivity);
220            return Ok(stats);
221        }
222
223        let num_rows = input_stats.num_rows;
224        let total_byte_size = input_stats.total_byte_size;
225        let input_analysis_ctx =
226            AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
227
228        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
229
230        // Estimate (inexact) selectivity of predicate
231        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
232        let num_rows = num_rows.with_estimated_selectivity(selectivity);
233        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
234
235        let column_statistics = collect_new_statistics(
236            &input_stats.column_statistics,
237            analysis_ctx.boundaries,
238        );
239        Ok(Statistics {
240            num_rows,
241            total_byte_size,
242            column_statistics,
243        })
244    }
245
246    fn extend_constants(
247        input: &Arc<dyn ExecutionPlan>,
248        predicate: &Arc<dyn PhysicalExpr>,
249    ) -> Vec<ConstExpr> {
250        let mut res_constants = Vec::new();
251        let input_eqs = input.equivalence_properties();
252
253        let conjunctions = split_conjunction(predicate);
254        for conjunction in conjunctions {
255            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
256                && binary.op() == &Operator::Eq
257            {
258                // Filter evaluates to single value for all partitions
259                if input_eqs.is_expr_constant(binary.left()).is_some() {
260                    let across = input_eqs
261                        .is_expr_constant(binary.right())
262                        .unwrap_or_default();
263                    res_constants
264                        .push(ConstExpr::new(Arc::clone(binary.right()), across));
265                } else if input_eqs.is_expr_constant(binary.right()).is_some() {
266                    let across = input_eqs
267                        .is_expr_constant(binary.left())
268                        .unwrap_or_default();
269                    res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
270                }
271            }
272        }
273        res_constants
274    }
275    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
276    fn compute_properties(
277        input: &Arc<dyn ExecutionPlan>,
278        predicate: &Arc<dyn PhysicalExpr>,
279        default_selectivity: u8,
280        projection: Option<&Vec<usize>>,
281    ) -> Result<PlanProperties> {
282        // Combine the equal predicates with the input equivalence properties
283        // to construct the equivalence properties:
284        let schema = input.schema();
285        let stats = Self::statistics_helper(
286            &schema,
287            input.partition_statistics(None)?,
288            predicate,
289            default_selectivity,
290        )?;
291        let mut eq_properties = input.equivalence_properties().clone();
292        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
293        for (lhs, rhs) in equal_pairs {
294            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
295        }
296        // Add the columns that have only one viable value (singleton) after
297        // filtering to constants.
298        let constants = collect_columns(predicate)
299            .into_iter()
300            .filter(|column| stats.column_statistics[column.index()].is_singleton())
301            .map(|column| {
302                let value = stats.column_statistics[column.index()]
303                    .min_value
304                    .get_value();
305                let expr = Arc::new(column) as _;
306                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
307            });
308        // This is for statistics
309        eq_properties.add_constants(constants)?;
310        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
311        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
312        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
313
314        let mut output_partitioning = input.output_partitioning().clone();
315        // If contains projection, update the PlanProperties.
316        if let Some(projection) = projection {
317            let schema = eq_properties.schema();
318            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
319            let out_schema = project_schema(schema, Some(projection))?;
320            output_partitioning =
321                output_partitioning.project(&projection_mapping, &eq_properties);
322            eq_properties = eq_properties.project(&projection_mapping, out_schema);
323        }
324
325        Ok(PlanProperties::new(
326            eq_properties,
327            output_partitioning,
328            input.pipeline_behavior(),
329            input.boundedness(),
330        ))
331    }
332}
333
334impl DisplayAs for FilterExec {
335    fn fmt_as(
336        &self,
337        t: DisplayFormatType,
338        f: &mut std::fmt::Formatter,
339    ) -> std::fmt::Result {
340        match t {
341            DisplayFormatType::Default | DisplayFormatType::Verbose => {
342                let display_projections = if let Some(projection) =
343                    self.projection.as_ref()
344                {
345                    format!(
346                        ", projection=[{}]",
347                        projection
348                            .iter()
349                            .map(|index| format!(
350                                "{}@{}",
351                                self.input.schema().fields().get(*index).unwrap().name(),
352                                index
353                            ))
354                            .collect::<Vec<_>>()
355                            .join(", ")
356                    )
357                } else {
358                    "".to_string()
359                };
360                let fetch = self
361                    .fetch
362                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
363                write!(
364                    f,
365                    "FilterExec: {}{}{}",
366                    self.predicate, display_projections, fetch
367                )
368            }
369            DisplayFormatType::TreeRender => {
370                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
371            }
372        }
373    }
374}
375
376impl ExecutionPlan for FilterExec {
377    fn name(&self) -> &'static str {
378        "FilterExec"
379    }
380
381    /// Return a reference to Any that can be used for downcasting
382    fn as_any(&self) -> &dyn Any {
383        self
384    }
385
386    fn properties(&self) -> &PlanProperties {
387        &self.cache
388    }
389
390    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
391        vec![&self.input]
392    }
393
394    fn maintains_input_order(&self) -> Vec<bool> {
395        // Tell optimizer this operator doesn't reorder its input
396        vec![true]
397    }
398
399    fn with_new_children(
400        self: Arc<Self>,
401        mut children: Vec<Arc<dyn ExecutionPlan>>,
402    ) -> Result<Arc<dyn ExecutionPlan>> {
403        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
404            .and_then(|e| {
405                let selectivity = e.default_selectivity();
406                e.with_default_selectivity(selectivity)
407            })
408            .and_then(|e| e.with_projection(self.projection().cloned()))
409            .map(|e| e.with_fetch(self.fetch).unwrap())
410    }
411
412    fn execute(
413        &self,
414        partition: usize,
415        context: Arc<TaskContext>,
416    ) -> Result<SendableRecordBatchStream> {
417        trace!(
418            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
419            partition,
420            context.session_id(),
421            context.task_id()
422        );
423        let metrics = FilterExecMetrics::new(&self.metrics, partition);
424        Ok(Box::pin(FilterExecStream {
425            schema: self.schema(),
426            predicate: Arc::clone(&self.predicate),
427            input: self.input.execute(partition, context)?,
428            metrics,
429            projection: self.projection.clone(),
430            batch_coalescer: LimitedBatchCoalescer::new(
431                self.schema(),
432                self.batch_size,
433                self.fetch,
434            ),
435        }))
436    }
437
438    fn metrics(&self) -> Option<MetricsSet> {
439        Some(self.metrics.clone_inner())
440    }
441
442    /// The output statistics of a filtering operation can be estimated if the
443    /// predicate's selectivity value can be determined for the incoming data.
444    fn statistics(&self) -> Result<Statistics> {
445        self.partition_statistics(None)
446    }
447
448    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
449        let input_stats = self.input.partition_statistics(partition)?;
450        let schema = self.schema();
451        let stats = Self::statistics_helper(
452            &schema,
453            input_stats,
454            self.predicate(),
455            self.default_selectivity,
456        )?;
457        Ok(stats.project(self.projection.as_ref()))
458    }
459
460    fn cardinality_effect(&self) -> CardinalityEffect {
461        CardinalityEffect::LowerEqual
462    }
463
464    /// Tries to swap `projection` with its input (`filter`). If possible, performs
465    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
466    fn try_swapping_with_projection(
467        &self,
468        projection: &ProjectionExec,
469    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
470        // If the projection does not narrow the schema, we should not try to push it down:
471        if projection.expr().len() < projection.input().schema().fields().len() {
472            // Each column in the predicate expression must exist after the projection.
473            if let Some(new_predicate) =
474                update_expr(self.predicate(), projection.expr(), false)?
475            {
476                return FilterExec::try_new(
477                    new_predicate,
478                    make_with_child(projection, self.input())?,
479                )
480                .and_then(|e| {
481                    let selectivity = self.default_selectivity();
482                    e.with_default_selectivity(selectivity)
483                })
484                .map(|e| Some(Arc::new(e) as _));
485            }
486        }
487        try_embed_projection(projection, self)
488    }
489
490    fn gather_filters_for_pushdown(
491        &self,
492        phase: FilterPushdownPhase,
493        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
494        _config: &ConfigOptions,
495    ) -> Result<FilterDescription> {
496        if !matches!(phase, FilterPushdownPhase::Pre) {
497            // For non-pre phase, filters pass through unchanged
498            let filter_supports = parent_filters
499                .into_iter()
500                .map(PushedDownPredicate::supported)
501                .collect();
502
503            return Ok(FilterDescription::new().with_child(ChildFilterDescription {
504                parent_filters: filter_supports,
505                self_filters: vec![],
506            }));
507        }
508
509        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
510            .with_self_filters(
511                split_conjunction(&self.predicate)
512                    .into_iter()
513                    .cloned()
514                    .collect(),
515            );
516
517        Ok(FilterDescription::new().with_child(child))
518    }
519
520    fn handle_child_pushdown_result(
521        &self,
522        phase: FilterPushdownPhase,
523        child_pushdown_result: ChildPushdownResult,
524        _config: &ConfigOptions,
525    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
526        if !matches!(phase, FilterPushdownPhase::Pre) {
527            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
528        }
529        // We absorb any parent filters that were not handled by our children
530        let unsupported_parent_filters =
531            child_pushdown_result.parent_filters.iter().filter_map(|f| {
532                matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
533            });
534        let unsupported_self_filters = child_pushdown_result
535            .self_filters
536            .first()
537            .expect("we have exactly one child")
538            .iter()
539            .filter_map(|f| match f.discriminant {
540                PushedDown::Yes => None,
541                PushedDown::No => Some(&f.predicate),
542            })
543            .cloned();
544
545        let unhandled_filters = unsupported_parent_filters
546            .into_iter()
547            .chain(unsupported_self_filters)
548            .collect_vec();
549
550        // If we have unhandled filters, we need to create a new FilterExec
551        let filter_input = Arc::clone(self.input());
552        let new_predicate = conjunction(unhandled_filters);
553        let updated_node = if new_predicate.eq(&lit(true)) {
554            // FilterExec is no longer needed, but we may need to leave a projection in place
555            match self.projection() {
556                Some(projection_indices) => {
557                    let filter_child_schema = filter_input.schema();
558                    let proj_exprs = projection_indices
559                        .iter()
560                        .map(|p| {
561                            let field = filter_child_schema.field(*p).clone();
562                            ProjectionExpr {
563                                expr: Arc::new(Column::new(field.name(), *p))
564                                    as Arc<dyn PhysicalExpr>,
565                                alias: field.name().to_string(),
566                            }
567                        })
568                        .collect::<Vec<_>>();
569                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
570                        as Arc<dyn ExecutionPlan>)
571                }
572                None => {
573                    // No projection needed, just return the input
574                    Some(filter_input)
575                }
576            }
577        } else if new_predicate.eq(&self.predicate) {
578            // The new predicate is the same as our current predicate
579            None
580        } else {
581            // Create a new FilterExec with the new predicate
582            let new = FilterExec {
583                predicate: Arc::clone(&new_predicate),
584                input: Arc::clone(&filter_input),
585                metrics: self.metrics.clone(),
586                default_selectivity: self.default_selectivity,
587                cache: Self::compute_properties(
588                    &filter_input,
589                    &new_predicate,
590                    self.default_selectivity,
591                    self.projection.as_ref(),
592                )?,
593                projection: None,
594                batch_size: self.batch_size,
595                fetch: self.fetch,
596            };
597            Some(Arc::new(new) as _)
598        };
599
600        Ok(FilterPushdownPropagation {
601            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
602            updated_node,
603        })
604    }
605
606    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
607        Some(Arc::new(Self {
608            predicate: Arc::clone(&self.predicate),
609            input: Arc::clone(&self.input),
610            metrics: self.metrics.clone(),
611            default_selectivity: self.default_selectivity,
612            cache: self.cache.clone(),
613            projection: self.projection.clone(),
614            batch_size: self.batch_size,
615            fetch,
616        }))
617    }
618}
619
620impl EmbeddedProjection for FilterExec {
621    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
622        self.with_projection(projection)
623    }
624}
625
626/// This function ensures that all bounds in the `ExprBoundaries` vector are
627/// converted to closed bounds. If a lower/upper bound is initially open, it
628/// is adjusted by using the next/previous value for its data type to convert
629/// it into a closed bound.
630fn collect_new_statistics(
631    input_column_stats: &[ColumnStatistics],
632    analysis_boundaries: Vec<ExprBoundaries>,
633) -> Vec<ColumnStatistics> {
634    analysis_boundaries
635        .into_iter()
636        .enumerate()
637        .map(
638            |(
639                idx,
640                ExprBoundaries {
641                    interval,
642                    distinct_count,
643                    ..
644                },
645            )| {
646                let Some(interval) = interval else {
647                    // If the interval is `None`, we can say that there are no rows:
648                    return ColumnStatistics {
649                        null_count: Precision::Exact(0),
650                        max_value: Precision::Exact(ScalarValue::Null),
651                        min_value: Precision::Exact(ScalarValue::Null),
652                        sum_value: Precision::Exact(ScalarValue::Null),
653                        distinct_count: Precision::Exact(0),
654                        byte_size: input_column_stats[idx].byte_size,
655                    };
656                };
657                let (lower, upper) = interval.into_bounds();
658                let (min_value, max_value) = if lower.eq(&upper) {
659                    (Precision::Exact(lower), Precision::Exact(upper))
660                } else {
661                    (Precision::Inexact(lower), Precision::Inexact(upper))
662                };
663                ColumnStatistics {
664                    null_count: input_column_stats[idx].null_count.to_inexact(),
665                    max_value,
666                    min_value,
667                    sum_value: Precision::Absent,
668                    distinct_count: distinct_count.to_inexact(),
669                    byte_size: input_column_stats[idx].byte_size,
670                }
671            },
672        )
673        .collect()
674}
675
676/// The FilterExec streams wraps the input iterator and applies the predicate expression to
677/// determine which rows to include in its output batches
678struct FilterExecStream {
679    /// Output schema after the projection
680    schema: SchemaRef,
681    /// The expression to filter on. This expression must evaluate to a boolean value.
682    predicate: Arc<dyn PhysicalExpr>,
683    /// The input partition to filter.
684    input: SendableRecordBatchStream,
685    /// Runtime metrics recording
686    metrics: FilterExecMetrics,
687    /// The projection indices of the columns in the input schema
688    projection: Option<Vec<usize>>,
689    /// Batch coalescer to combine small batches
690    batch_coalescer: LimitedBatchCoalescer,
691}
692
693/// The metrics for `FilterExec`
694struct FilterExecMetrics {
695    /// Common metrics for most operators
696    baseline_metrics: BaselineMetrics,
697    /// Selectivity of the filter, calculated as output_rows / input_rows
698    selectivity: RatioMetrics,
699    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
700    // or modifying metrics comments
701}
702
703impl FilterExecMetrics {
704    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
705        Self {
706            baseline_metrics: BaselineMetrics::new(metrics, partition),
707            selectivity: MetricBuilder::new(metrics)
708                .with_type(MetricType::SUMMARY)
709                .ratio_metrics("selectivity", partition),
710        }
711    }
712}
713
714impl FilterExecStream {
715    fn flush_remaining_batches(
716        &mut self,
717    ) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
718        // Flush any remaining buffered batch
719        match self.batch_coalescer.finish() {
720            Ok(()) => {
721                Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
722                    self.metrics.selectivity.add_part(batch.num_rows());
723                    Ok(batch)
724                }))
725            }
726            Err(e) => Poll::Ready(Some(Err(e))),
727        }
728    }
729}
730
731pub fn batch_filter(
732    batch: &RecordBatch,
733    predicate: &Arc<dyn PhysicalExpr>,
734) -> Result<RecordBatch> {
735    filter_and_project(batch, predicate, None)
736}
737
738fn filter_and_project(
739    batch: &RecordBatch,
740    predicate: &Arc<dyn PhysicalExpr>,
741    projection: Option<&Vec<usize>>,
742) -> Result<RecordBatch> {
743    predicate
744        .evaluate(batch)
745        .and_then(|v| v.into_array(batch.num_rows()))
746        .and_then(|array| {
747            Ok(match (as_boolean_array(&array), projection) {
748                // Apply filter array to record batch
749                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
750                (Ok(filter_array), Some(projection)) => {
751                    let projected_batch = batch.project(projection)?;
752                    filter_record_batch(&projected_batch, filter_array)?
753                }
754                (Err(_), _) => {
755                    return internal_err!(
756                        "Cannot create filter_array from non-boolean predicates"
757                    );
758                }
759            })
760        })
761}
762
763impl Stream for FilterExecStream {
764    type Item = Result<RecordBatch>;
765
766    fn poll_next(
767        mut self: Pin<&mut Self>,
768        cx: &mut Context<'_>,
769    ) -> Poll<Option<Self::Item>> {
770        let poll;
771        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
772        loop {
773            match ready!(self.input.poll_next_unpin(cx)) {
774                Some(Ok(batch)) => {
775                    let timer = elapsed_compute.timer();
776                    let status = self.predicate.as_ref()
777                        .evaluate(&batch)
778                        .and_then(|v| v.into_array(batch.num_rows()))
779                        .and_then(|array| {
780                            Ok(match self.projection {
781                                Some(ref projection) => {
782                                    let projected_batch = batch.project(projection)?;
783                                    (array, projected_batch)
784                                },
785                                None => (array, batch)
786                            })
787                        }).and_then(|(array, batch)| {
788                            match as_boolean_array(&array) {
789                                Ok(filter_array) => {
790                                    self.metrics.selectivity.add_total(batch.num_rows());
791                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
792                                    let batch = filter_record_batch(&batch, filter_array)?;
793                                    let state = self.batch_coalescer.push_batch(batch)?;
794                                    Ok(state)
795                                }
796                                Err(_) => {
797                                    internal_err!(
798                                        "Cannot create filter_array from non-boolean predicates"
799                                    )
800                                }
801                            }
802                        })?;
803                    timer.done();
804
805                    if let LimitReached = status {
806                        poll = self.flush_remaining_batches();
807                        break;
808                    }
809
810                    if let Some(batch) = self.batch_coalescer.next_completed_batch() {
811                        self.metrics.selectivity.add_part(batch.num_rows());
812                        poll = Poll::Ready(Some(Ok(batch)));
813                        break;
814                    }
815                    continue;
816                }
817                None => {
818                    // Flush any remaining buffered batch
819                    match self.batch_coalescer.finish() {
820                        Ok(()) => {
821                            poll = self.flush_remaining_batches();
822                        }
823                        Err(e) => {
824                            poll = Poll::Ready(Some(Err(e)));
825                        }
826                    }
827                    break;
828                }
829                value => {
830                    poll = Poll::Ready(value);
831                    break;
832                }
833            }
834        }
835        self.metrics.baseline_metrics.record_poll(poll)
836    }
837
838    fn size_hint(&self) -> (usize, Option<usize>) {
839        // Same number of record batches
840        self.input.size_hint()
841    }
842}
843impl RecordBatchStream for FilterExecStream {
844    fn schema(&self) -> SchemaRef {
845        Arc::clone(&self.schema)
846    }
847}
848
849/// Return the equals Column-Pairs and Non-equals Column-Pairs
850#[deprecated(
851    since = "51.0.0",
852    note = "This function will be internal in the future"
853)]
854pub fn collect_columns_from_predicate(
855    predicate: &'_ Arc<dyn PhysicalExpr>,
856) -> EqualAndNonEqual<'_> {
857    collect_columns_from_predicate_inner(predicate)
858}
859
860fn collect_columns_from_predicate_inner(
861    predicate: &'_ Arc<dyn PhysicalExpr>,
862) -> EqualAndNonEqual<'_> {
863    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
864    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
865
866    let predicates = split_conjunction(predicate);
867    predicates.into_iter().for_each(|p| {
868        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
869            match binary.op() {
870                Operator::Eq => {
871                    eq_predicate_columns.push((binary.left(), binary.right()))
872                }
873                Operator::NotEq => {
874                    ne_predicate_columns.push((binary.left(), binary.right()))
875                }
876                _ => {}
877            }
878        }
879    });
880
881    (eq_predicate_columns, ne_predicate_columns)
882}
883
884/// Pair of `Arc<dyn PhysicalExpr>`s
885pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
886
887/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
888pub type EqualAndNonEqual<'a> =
889    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
890
891#[cfg(test)]
892mod tests {
893    use super::*;
894    use crate::empty::EmptyExec;
895    use crate::expressions::*;
896    use crate::test;
897    use crate::test::exec::StatisticsExec;
898    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
899    use datafusion_common::ScalarValue;
900
901    #[tokio::test]
902    async fn collect_columns_predicates() -> Result<()> {
903        let schema = test::aggr_test_schema();
904        let predicate: Arc<dyn PhysicalExpr> = binary(
905            binary(
906                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
907                Operator::And,
908                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
909                &schema,
910            )?,
911            Operator::And,
912            binary(
913                binary(
914                    col("c2", &schema)?,
915                    Operator::Eq,
916                    col("c9", &schema)?,
917                    &schema,
918                )?,
919                Operator::And,
920                binary(
921                    col("c1", &schema)?,
922                    Operator::NotEq,
923                    col("c13", &schema)?,
924                    &schema,
925                )?,
926                &schema,
927            )?,
928            &schema,
929        )?;
930
931        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
932        assert_eq!(2, equal_pairs.len());
933        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
934        assert!(equal_pairs[0].1.eq(&lit(4u32)));
935
936        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
937        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
938
939        assert_eq!(1, ne_pairs.len());
940        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
941        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
942
943        Ok(())
944    }
945
946    #[tokio::test]
947    async fn test_filter_statistics_basic_expr() -> Result<()> {
948        // Table:
949        //      a: min=1, max=100
950        let bytes_per_row = 4;
951        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
952        let input = Arc::new(StatisticsExec::new(
953            Statistics {
954                num_rows: Precision::Inexact(100),
955                total_byte_size: Precision::Inexact(100 * bytes_per_row),
956                column_statistics: vec![ColumnStatistics {
957                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
958                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
959                    ..Default::default()
960                }],
961            },
962            schema.clone(),
963        ));
964
965        // a <= 25
966        let predicate: Arc<dyn PhysicalExpr> =
967            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
968
969        // WHERE a <= 25
970        let filter: Arc<dyn ExecutionPlan> =
971            Arc::new(FilterExec::try_new(predicate, input)?);
972
973        let statistics = filter.partition_statistics(None)?;
974        assert_eq!(statistics.num_rows, Precision::Inexact(25));
975        assert_eq!(
976            statistics.total_byte_size,
977            Precision::Inexact(25 * bytes_per_row)
978        );
979        assert_eq!(
980            statistics.column_statistics,
981            vec![ColumnStatistics {
982                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
983                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
984                ..Default::default()
985            }]
986        );
987
988        Ok(())
989    }
990
991    #[tokio::test]
992    async fn test_filter_statistics_column_level_nested() -> Result<()> {
993        // Table:
994        //      a: min=1, max=100
995        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
996        let input = Arc::new(StatisticsExec::new(
997            Statistics {
998                num_rows: Precision::Inexact(100),
999                column_statistics: vec![ColumnStatistics {
1000                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1001                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1002                    ..Default::default()
1003                }],
1004                total_byte_size: Precision::Absent,
1005            },
1006            schema.clone(),
1007        ));
1008
1009        // WHERE a <= 25
1010        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1011            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1012            input,
1013        )?);
1014
1015        // Nested filters (two separate physical plans, instead of AND chain in the expr)
1016        // WHERE a >= 10
1017        // WHERE a <= 25
1018        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1019            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1020            sub_filter,
1021        )?);
1022
1023        let statistics = filter.partition_statistics(None)?;
1024        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1025        assert_eq!(
1026            statistics.column_statistics,
1027            vec![ColumnStatistics {
1028                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1029                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1030                ..Default::default()
1031            }]
1032        );
1033
1034        Ok(())
1035    }
1036
1037    #[tokio::test]
1038    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1039        // Table:
1040        //      a: min=1, max=100
1041        //      b: min=1, max=50
1042        let schema = Schema::new(vec![
1043            Field::new("a", DataType::Int32, false),
1044            Field::new("b", DataType::Int32, false),
1045        ]);
1046        let input = Arc::new(StatisticsExec::new(
1047            Statistics {
1048                num_rows: Precision::Inexact(100),
1049                column_statistics: vec![
1050                    ColumnStatistics {
1051                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1052                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1053                        ..Default::default()
1054                    },
1055                    ColumnStatistics {
1056                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1057                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1058                        ..Default::default()
1059                    },
1060                ],
1061                total_byte_size: Precision::Absent,
1062            },
1063            schema.clone(),
1064        ));
1065
1066        // WHERE a <= 25
1067        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1068            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1069            input,
1070        )?);
1071
1072        // WHERE b > 45
1073        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1074            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1075            a_lte_25,
1076        )?);
1077
1078        // WHERE a >= 10
1079        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1080            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1081            b_gt_5,
1082        )?);
1083        let statistics = filter.partition_statistics(None)?;
1084        // On a uniform distribution, only fifteen rows will satisfy the
1085        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1086        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1087        //
1088        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1089        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1090        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1091        assert_eq!(
1092            statistics.column_statistics,
1093            vec![
1094                ColumnStatistics {
1095                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1096                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1097                    ..Default::default()
1098                },
1099                ColumnStatistics {
1100                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1101                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1102                    ..Default::default()
1103                }
1104            ]
1105        );
1106
1107        Ok(())
1108    }
1109
1110    #[tokio::test]
1111    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1112        // Table:
1113        //      a: min=???, max=??? (missing)
1114        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1115        let input = Arc::new(StatisticsExec::new(
1116            Statistics::new_unknown(&schema),
1117            schema.clone(),
1118        ));
1119
1120        // a <= 25
1121        let predicate: Arc<dyn PhysicalExpr> =
1122            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1123
1124        // WHERE a <= 25
1125        let filter: Arc<dyn ExecutionPlan> =
1126            Arc::new(FilterExec::try_new(predicate, input)?);
1127
1128        let statistics = filter.partition_statistics(None)?;
1129        assert_eq!(statistics.num_rows, Precision::Absent);
1130
1131        Ok(())
1132    }
1133
1134    #[tokio::test]
1135    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1136        // Table:
1137        //      a: min=1, max=100
1138        //      b: min=1, max=3
1139        //      c: min=1000.0  max=1100.0
1140        let schema = Schema::new(vec![
1141            Field::new("a", DataType::Int32, false),
1142            Field::new("b", DataType::Int32, false),
1143            Field::new("c", DataType::Float32, false),
1144        ]);
1145        let input = Arc::new(StatisticsExec::new(
1146            Statistics {
1147                num_rows: Precision::Inexact(1000),
1148                total_byte_size: Precision::Inexact(4000),
1149                column_statistics: vec![
1150                    ColumnStatistics {
1151                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1152                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1153                        ..Default::default()
1154                    },
1155                    ColumnStatistics {
1156                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1157                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1158                        ..Default::default()
1159                    },
1160                    ColumnStatistics {
1161                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1162                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1163                        ..Default::default()
1164                    },
1165                ],
1166            },
1167            schema,
1168        ));
1169        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1170        let predicate = Arc::new(BinaryExpr::new(
1171            Arc::new(BinaryExpr::new(
1172                Arc::new(Column::new("a", 0)),
1173                Operator::LtEq,
1174                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1175            )),
1176            Operator::And,
1177            Arc::new(BinaryExpr::new(
1178                Arc::new(BinaryExpr::new(
1179                    Arc::new(Column::new("b", 1)),
1180                    Operator::Eq,
1181                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1182                )),
1183                Operator::And,
1184                Arc::new(BinaryExpr::new(
1185                    Arc::new(BinaryExpr::new(
1186                        Arc::new(Column::new("c", 2)),
1187                        Operator::LtEq,
1188                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1189                    )),
1190                    Operator::And,
1191                    Arc::new(BinaryExpr::new(
1192                        Arc::new(Column::new("a", 0)),
1193                        Operator::Gt,
1194                        Arc::new(Column::new("b", 1)),
1195                    )),
1196                )),
1197            )),
1198        ));
1199        let filter: Arc<dyn ExecutionPlan> =
1200            Arc::new(FilterExec::try_new(predicate, input)?);
1201        let statistics = filter.partition_statistics(None)?;
1202        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1203        // num_rows after ceil => 133.0... => 134
1204        // total_byte_size after ceil => 532.0... => 533
1205        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1206        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1207        let exp_col_stats = vec![
1208            ColumnStatistics {
1209                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1210                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1211                ..Default::default()
1212            },
1213            ColumnStatistics {
1214                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1215                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1216                ..Default::default()
1217            },
1218            ColumnStatistics {
1219                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1220                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1221                ..Default::default()
1222            },
1223        ];
1224        let _ = exp_col_stats
1225            .into_iter()
1226            .zip(statistics.column_statistics)
1227            .map(|(expected, actual)| {
1228                if let Some(val) = actual.min_value.get_value() {
1229                    if val.data_type().is_floating() {
1230                        // Windows rounds arithmetic operation results differently for floating point numbers.
1231                        // Therefore, we check if the actual values are in an epsilon range.
1232                        let actual_min = actual.min_value.get_value().unwrap();
1233                        let actual_max = actual.max_value.get_value().unwrap();
1234                        let expected_min = expected.min_value.get_value().unwrap();
1235                        let expected_max = expected.max_value.get_value().unwrap();
1236                        let eps = ScalarValue::Float32(Some(1e-6));
1237
1238                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1239                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1240
1241                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1242                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1243                    } else {
1244                        assert_eq!(actual, expected);
1245                    }
1246                } else {
1247                    assert_eq!(actual, expected);
1248                }
1249            });
1250
1251        Ok(())
1252    }
1253
1254    #[tokio::test]
1255    async fn test_filter_statistics_full_selective() -> Result<()> {
1256        // Table:
1257        //      a: min=1, max=100
1258        //      b: min=1, max=3
1259        let schema = Schema::new(vec![
1260            Field::new("a", DataType::Int32, false),
1261            Field::new("b", DataType::Int32, false),
1262        ]);
1263        let input = Arc::new(StatisticsExec::new(
1264            Statistics {
1265                num_rows: Precision::Inexact(1000),
1266                total_byte_size: Precision::Inexact(4000),
1267                column_statistics: vec![
1268                    ColumnStatistics {
1269                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1270                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1271                        ..Default::default()
1272                    },
1273                    ColumnStatistics {
1274                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1275                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1276                        ..Default::default()
1277                    },
1278                ],
1279            },
1280            schema,
1281        ));
1282        // WHERE a<200 AND 1<=b
1283        let predicate = Arc::new(BinaryExpr::new(
1284            Arc::new(BinaryExpr::new(
1285                Arc::new(Column::new("a", 0)),
1286                Operator::Lt,
1287                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1288            )),
1289            Operator::And,
1290            Arc::new(BinaryExpr::new(
1291                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1292                Operator::LtEq,
1293                Arc::new(Column::new("b", 1)),
1294            )),
1295        ));
1296        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1297        let expected = input.partition_statistics(None)?.column_statistics;
1298        let filter: Arc<dyn ExecutionPlan> =
1299            Arc::new(FilterExec::try_new(predicate, input)?);
1300        let statistics = filter.partition_statistics(None)?;
1301
1302        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1303        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1304        assert_eq!(statistics.column_statistics, expected);
1305
1306        Ok(())
1307    }
1308
1309    #[tokio::test]
1310    async fn test_filter_statistics_zero_selective() -> Result<()> {
1311        // Table:
1312        //      a: min=1, max=100
1313        //      b: min=1, max=3
1314        let schema = Schema::new(vec![
1315            Field::new("a", DataType::Int32, false),
1316            Field::new("b", DataType::Int32, false),
1317        ]);
1318        let input = Arc::new(StatisticsExec::new(
1319            Statistics {
1320                num_rows: Precision::Inexact(1000),
1321                total_byte_size: Precision::Inexact(4000),
1322                column_statistics: vec![
1323                    ColumnStatistics {
1324                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1325                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1326                        ..Default::default()
1327                    },
1328                    ColumnStatistics {
1329                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1330                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1331                        ..Default::default()
1332                    },
1333                ],
1334            },
1335            schema,
1336        ));
1337        // WHERE a>200 AND 1<=b
1338        let predicate = Arc::new(BinaryExpr::new(
1339            Arc::new(BinaryExpr::new(
1340                Arc::new(Column::new("a", 0)),
1341                Operator::Gt,
1342                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1343            )),
1344            Operator::And,
1345            Arc::new(BinaryExpr::new(
1346                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1347                Operator::LtEq,
1348                Arc::new(Column::new("b", 1)),
1349            )),
1350        ));
1351        let filter: Arc<dyn ExecutionPlan> =
1352            Arc::new(FilterExec::try_new(predicate, input)?);
1353        let statistics = filter.partition_statistics(None)?;
1354
1355        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1356        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1357        assert_eq!(
1358            statistics.column_statistics,
1359            vec![
1360                ColumnStatistics {
1361                    min_value: Precision::Exact(ScalarValue::Null),
1362                    max_value: Precision::Exact(ScalarValue::Null),
1363                    sum_value: Precision::Exact(ScalarValue::Null),
1364                    distinct_count: Precision::Exact(0),
1365                    null_count: Precision::Exact(0),
1366                    byte_size: Precision::Absent,
1367                },
1368                ColumnStatistics {
1369                    min_value: Precision::Exact(ScalarValue::Null),
1370                    max_value: Precision::Exact(ScalarValue::Null),
1371                    sum_value: Precision::Exact(ScalarValue::Null),
1372                    distinct_count: Precision::Exact(0),
1373                    null_count: Precision::Exact(0),
1374                    byte_size: Precision::Absent,
1375                },
1376            ]
1377        );
1378
1379        Ok(())
1380    }
1381
1382    #[tokio::test]
1383    async fn test_filter_statistics_more_inputs() -> Result<()> {
1384        let schema = Schema::new(vec![
1385            Field::new("a", DataType::Int32, false),
1386            Field::new("b", DataType::Int32, false),
1387        ]);
1388        let input = Arc::new(StatisticsExec::new(
1389            Statistics {
1390                num_rows: Precision::Inexact(1000),
1391                total_byte_size: Precision::Inexact(4000),
1392                column_statistics: vec![
1393                    ColumnStatistics {
1394                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1395                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1396                        ..Default::default()
1397                    },
1398                    ColumnStatistics {
1399                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1400                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1401                        ..Default::default()
1402                    },
1403                ],
1404            },
1405            schema,
1406        ));
1407        // WHERE a<50
1408        let predicate = Arc::new(BinaryExpr::new(
1409            Arc::new(Column::new("a", 0)),
1410            Operator::Lt,
1411            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1412        ));
1413        let filter: Arc<dyn ExecutionPlan> =
1414            Arc::new(FilterExec::try_new(predicate, input)?);
1415        let statistics = filter.partition_statistics(None)?;
1416
1417        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1418        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1419        assert_eq!(
1420            statistics.column_statistics,
1421            vec![
1422                ColumnStatistics {
1423                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1424                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1425                    ..Default::default()
1426                },
1427                ColumnStatistics {
1428                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1429                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1430                    ..Default::default()
1431                },
1432            ]
1433        );
1434
1435        Ok(())
1436    }
1437
1438    #[tokio::test]
1439    async fn test_empty_input_statistics() -> Result<()> {
1440        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1441        let input = Arc::new(StatisticsExec::new(
1442            Statistics::new_unknown(&schema),
1443            schema,
1444        ));
1445        // WHERE a <= 10 AND 0 <= a - 5
1446        let predicate = Arc::new(BinaryExpr::new(
1447            Arc::new(BinaryExpr::new(
1448                Arc::new(Column::new("a", 0)),
1449                Operator::LtEq,
1450                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1451            )),
1452            Operator::And,
1453            Arc::new(BinaryExpr::new(
1454                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1455                Operator::LtEq,
1456                Arc::new(BinaryExpr::new(
1457                    Arc::new(Column::new("a", 0)),
1458                    Operator::Minus,
1459                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1460                )),
1461            )),
1462        ));
1463        let filter: Arc<dyn ExecutionPlan> =
1464            Arc::new(FilterExec::try_new(predicate, input)?);
1465        let filter_statistics = filter.partition_statistics(None)?;
1466
1467        let expected_filter_statistics = Statistics {
1468            num_rows: Precision::Absent,
1469            total_byte_size: Precision::Absent,
1470            column_statistics: vec![ColumnStatistics {
1471                null_count: Precision::Absent,
1472                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1473                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1474                sum_value: Precision::Absent,
1475                distinct_count: Precision::Absent,
1476                byte_size: Precision::Absent,
1477            }],
1478        };
1479
1480        assert_eq!(filter_statistics, expected_filter_statistics);
1481
1482        Ok(())
1483    }
1484
1485    #[tokio::test]
1486    async fn test_statistics_with_constant_column() -> Result<()> {
1487        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1488        let input = Arc::new(StatisticsExec::new(
1489            Statistics::new_unknown(&schema),
1490            schema,
1491        ));
1492        // WHERE a = 10
1493        let predicate = Arc::new(BinaryExpr::new(
1494            Arc::new(Column::new("a", 0)),
1495            Operator::Eq,
1496            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1497        ));
1498        let filter: Arc<dyn ExecutionPlan> =
1499            Arc::new(FilterExec::try_new(predicate, input)?);
1500        let filter_statistics = filter.partition_statistics(None)?;
1501        // First column is "a", and it is a column with only one value after the filter.
1502        assert!(filter_statistics.column_statistics[0].is_singleton());
1503
1504        Ok(())
1505    }
1506
1507    #[tokio::test]
1508    async fn test_validation_filter_selectivity() -> Result<()> {
1509        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1510        let input = Arc::new(StatisticsExec::new(
1511            Statistics::new_unknown(&schema),
1512            schema,
1513        ));
1514        // WHERE a = 10
1515        let predicate = Arc::new(BinaryExpr::new(
1516            Arc::new(Column::new("a", 0)),
1517            Operator::Eq,
1518            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1519        ));
1520        let filter = FilterExec::try_new(predicate, input)?;
1521        assert!(filter.with_default_selectivity(120).is_err());
1522        Ok(())
1523    }
1524
1525    #[tokio::test]
1526    async fn test_custom_filter_selectivity() -> Result<()> {
1527        // Need a decimal to trigger inexact selectivity
1528        let schema =
1529            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1530        let input = Arc::new(StatisticsExec::new(
1531            Statistics {
1532                num_rows: Precision::Inexact(1000),
1533                total_byte_size: Precision::Inexact(4000),
1534                column_statistics: vec![ColumnStatistics {
1535                    ..Default::default()
1536                }],
1537            },
1538            schema,
1539        ));
1540        // WHERE a = 10
1541        let predicate = Arc::new(BinaryExpr::new(
1542            Arc::new(Column::new("a", 0)),
1543            Operator::Eq,
1544            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1545        ));
1546        let filter = FilterExec::try_new(predicate, input)?;
1547        let statistics = filter.partition_statistics(None)?;
1548        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1549        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1550        let filter = filter.with_default_selectivity(40)?;
1551        let statistics = filter.partition_statistics(None)?;
1552        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1553        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1554        Ok(())
1555    }
1556
1557    #[test]
1558    fn test_equivalence_properties_union_type() -> Result<()> {
1559        let union_type = DataType::Union(
1560            UnionFields::new(
1561                vec![0, 1],
1562                vec![
1563                    Field::new("f1", DataType::Int32, true),
1564                    Field::new("f2", DataType::Utf8, true),
1565                ],
1566            ),
1567            UnionMode::Sparse,
1568        );
1569
1570        let schema = Arc::new(Schema::new(vec![
1571            Field::new("c1", DataType::Int32, true),
1572            Field::new("c2", union_type, true),
1573        ]));
1574
1575        let exec = FilterExec::try_new(
1576            binary(
1577                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1578                Operator::And,
1579                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1580                &schema,
1581            )?,
1582            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1583        )?;
1584
1585        exec.partition_statistics(None).unwrap();
1586
1587        Ok(())
1588    }
1589}