datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39    try_embed_projection, update_expr,
40};
41use crate::{
42    DisplayFormatType, ExecutionPlan,
43    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::collect_columns;
61use datafusion_physical_expr::{
62    AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
63    conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
74/// include in its output batches.
75#[derive(Debug, Clone)]
76pub struct FilterExec {
77    /// The expression to filter on. This expression must evaluate to a boolean value.
78    predicate: Arc<dyn PhysicalExpr>,
79    /// The input plan
80    input: Arc<dyn ExecutionPlan>,
81    /// Execution metrics
82    metrics: ExecutionPlanMetricsSet,
83    /// Selectivity for statistics. 0 = no rows, 100 = all rows
84    default_selectivity: u8,
85    /// Properties equivalence properties, partitioning, etc.
86    cache: PlanProperties,
87    /// The projection indices of the columns in the output schema of join
88    projection: Option<Vec<usize>>,
89    /// Target batch size for output batches
90    batch_size: usize,
91    /// Number of rows to fetch
92    fetch: Option<usize>,
93}
94
95impl FilterExec {
96    /// Create a FilterExec on an input
97    #[expect(clippy::needless_pass_by_value)]
98    pub fn try_new(
99        predicate: Arc<dyn PhysicalExpr>,
100        input: Arc<dyn ExecutionPlan>,
101    ) -> Result<Self> {
102        match predicate.data_type(input.schema().as_ref())? {
103            DataType::Boolean => {
104                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105                let cache = Self::compute_properties(
106                    &input,
107                    &predicate,
108                    default_selectivity,
109                    None,
110                )?;
111                Ok(Self {
112                    predicate,
113                    input: Arc::clone(&input),
114                    metrics: ExecutionPlanMetricsSet::new(),
115                    default_selectivity,
116                    cache,
117                    projection: None,
118                    batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119                    fetch: None,
120                })
121            }
122            other => {
123                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124            }
125        }
126    }
127
128    pub fn with_default_selectivity(
129        mut self,
130        default_selectivity: u8,
131    ) -> Result<Self, DataFusionError> {
132        if default_selectivity > 100 {
133            return plan_err!(
134                "Default filter selectivity value needs to be less than or equal to 100"
135            );
136        }
137        self.default_selectivity = default_selectivity;
138        Ok(self)
139    }
140
141    /// Return new instance of [FilterExec] with the given projection.
142    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143        //  Check if the projection is valid
144        can_project(&self.schema(), projection.as_ref())?;
145
146        let projection = match projection {
147            Some(projection) => match &self.projection {
148                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149                None => Some(projection),
150            },
151            None => None,
152        };
153
154        let cache = Self::compute_properties(
155            &self.input,
156            &self.predicate,
157            self.default_selectivity,
158            projection.as_ref(),
159        )?;
160        Ok(Self {
161            predicate: Arc::clone(&self.predicate),
162            input: Arc::clone(&self.input),
163            metrics: self.metrics.clone(),
164            default_selectivity: self.default_selectivity,
165            cache,
166            projection,
167            batch_size: self.batch_size,
168            fetch: self.fetch,
169        })
170    }
171
172    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173        Ok(Self {
174            predicate: Arc::clone(&self.predicate),
175            input: Arc::clone(&self.input),
176            metrics: self.metrics.clone(),
177            default_selectivity: self.default_selectivity,
178            cache: self.cache.clone(),
179            projection: self.projection.clone(),
180            batch_size,
181            fetch: self.fetch,
182        })
183    }
184
185    /// The expression to filter on. This expression must evaluate to a boolean value.
186    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187        &self.predicate
188    }
189
190    /// The input plan
191    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192        &self.input
193    }
194
195    /// The default selectivity
196    pub fn default_selectivity(&self) -> u8 {
197        self.default_selectivity
198    }
199
200    /// Projection
201    pub fn projection(&self) -> Option<&Vec<usize>> {
202        self.projection.as_ref()
203    }
204
205    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
206    fn statistics_helper(
207        schema: &SchemaRef,
208        input_stats: Statistics,
209        predicate: &Arc<dyn PhysicalExpr>,
210        default_selectivity: u8,
211    ) -> Result<Statistics> {
212        if !check_support(predicate, schema) {
213            let selectivity = default_selectivity as f64 / 100.0;
214            let mut stats = input_stats.to_inexact();
215            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216            stats.total_byte_size = stats
217                .total_byte_size
218                .with_estimated_selectivity(selectivity);
219            return Ok(stats);
220        }
221
222        let num_rows = input_stats.num_rows;
223        let total_byte_size = input_stats.total_byte_size;
224        let input_analysis_ctx =
225            AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229        // Estimate (inexact) selectivity of predicate
230        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231        let num_rows = num_rows.with_estimated_selectivity(selectivity);
232        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234        let column_statistics = collect_new_statistics(
235            &input_stats.column_statistics,
236            analysis_ctx.boundaries,
237        );
238        Ok(Statistics {
239            num_rows,
240            total_byte_size,
241            column_statistics,
242        })
243    }
244
245    fn extend_constants(
246        input: &Arc<dyn ExecutionPlan>,
247        predicate: &Arc<dyn PhysicalExpr>,
248    ) -> Vec<ConstExpr> {
249        let mut res_constants = Vec::new();
250        let input_eqs = input.equivalence_properties();
251
252        let conjunctions = split_conjunction(predicate);
253        for conjunction in conjunctions {
254            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
255                && binary.op() == &Operator::Eq
256            {
257                // Filter evaluates to single value for all partitions
258                if input_eqs.is_expr_constant(binary.left()).is_some() {
259                    let across = input_eqs
260                        .is_expr_constant(binary.right())
261                        .unwrap_or_default();
262                    res_constants
263                        .push(ConstExpr::new(Arc::clone(binary.right()), across));
264                } else if input_eqs.is_expr_constant(binary.right()).is_some() {
265                    let across = input_eqs
266                        .is_expr_constant(binary.left())
267                        .unwrap_or_default();
268                    res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
269                }
270            }
271        }
272        res_constants
273    }
274    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
275    fn compute_properties(
276        input: &Arc<dyn ExecutionPlan>,
277        predicate: &Arc<dyn PhysicalExpr>,
278        default_selectivity: u8,
279        projection: Option<&Vec<usize>>,
280    ) -> Result<PlanProperties> {
281        // Combine the equal predicates with the input equivalence properties
282        // to construct the equivalence properties:
283        let schema = input.schema();
284        let stats = Self::statistics_helper(
285            &schema,
286            input.partition_statistics(None)?,
287            predicate,
288            default_selectivity,
289        )?;
290        let mut eq_properties = input.equivalence_properties().clone();
291        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
292        for (lhs, rhs) in equal_pairs {
293            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
294        }
295        // Add the columns that have only one viable value (singleton) after
296        // filtering to constants.
297        let constants = collect_columns(predicate)
298            .into_iter()
299            .filter(|column| stats.column_statistics[column.index()].is_singleton())
300            .map(|column| {
301                let value = stats.column_statistics[column.index()]
302                    .min_value
303                    .get_value();
304                let expr = Arc::new(column) as _;
305                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
306            });
307        // This is for statistics
308        eq_properties.add_constants(constants)?;
309        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
310        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
311        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
312
313        let mut output_partitioning = input.output_partitioning().clone();
314        // If contains projection, update the PlanProperties.
315        if let Some(projection) = projection {
316            let schema = eq_properties.schema();
317            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
318            let out_schema = project_schema(schema, Some(projection))?;
319            output_partitioning =
320                output_partitioning.project(&projection_mapping, &eq_properties);
321            eq_properties = eq_properties.project(&projection_mapping, out_schema);
322        }
323
324        Ok(PlanProperties::new(
325            eq_properties,
326            output_partitioning,
327            input.pipeline_behavior(),
328            input.boundedness(),
329        ))
330    }
331}
332
333impl DisplayAs for FilterExec {
334    fn fmt_as(
335        &self,
336        t: DisplayFormatType,
337        f: &mut std::fmt::Formatter,
338    ) -> std::fmt::Result {
339        match t {
340            DisplayFormatType::Default | DisplayFormatType::Verbose => {
341                let display_projections = if let Some(projection) =
342                    self.projection.as_ref()
343                {
344                    format!(
345                        ", projection=[{}]",
346                        projection
347                            .iter()
348                            .map(|index| format!(
349                                "{}@{}",
350                                self.input.schema().fields().get(*index).unwrap().name(),
351                                index
352                            ))
353                            .collect::<Vec<_>>()
354                            .join(", ")
355                    )
356                } else {
357                    "".to_string()
358                };
359                let fetch = self
360                    .fetch
361                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
362                write!(
363                    f,
364                    "FilterExec: {}{}{}",
365                    self.predicate, display_projections, fetch
366                )
367            }
368            DisplayFormatType::TreeRender => {
369                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
370            }
371        }
372    }
373}
374
375impl ExecutionPlan for FilterExec {
376    fn name(&self) -> &'static str {
377        "FilterExec"
378    }
379
380    /// Return a reference to Any that can be used for downcasting
381    fn as_any(&self) -> &dyn Any {
382        self
383    }
384
385    fn properties(&self) -> &PlanProperties {
386        &self.cache
387    }
388
389    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
390        vec![&self.input]
391    }
392
393    fn maintains_input_order(&self) -> Vec<bool> {
394        // Tell optimizer this operator doesn't reorder its input
395        vec![true]
396    }
397
398    fn with_new_children(
399        self: Arc<Self>,
400        mut children: Vec<Arc<dyn ExecutionPlan>>,
401    ) -> Result<Arc<dyn ExecutionPlan>> {
402        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
403            .and_then(|e| {
404                let selectivity = e.default_selectivity();
405                e.with_default_selectivity(selectivity)
406            })
407            .and_then(|e| e.with_projection(self.projection().cloned()))
408            .map(|e| e.with_fetch(self.fetch).unwrap())
409    }
410
411    fn execute(
412        &self,
413        partition: usize,
414        context: Arc<TaskContext>,
415    ) -> Result<SendableRecordBatchStream> {
416        trace!(
417            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
418            partition,
419            context.session_id(),
420            context.task_id()
421        );
422        let metrics = FilterExecMetrics::new(&self.metrics, partition);
423        Ok(Box::pin(FilterExecStream {
424            schema: self.schema(),
425            predicate: Arc::clone(&self.predicate),
426            input: self.input.execute(partition, context)?,
427            metrics,
428            projection: self.projection.clone(),
429            batch_coalescer: LimitedBatchCoalescer::new(
430                self.schema(),
431                self.batch_size,
432                self.fetch,
433            ),
434        }))
435    }
436
437    fn metrics(&self) -> Option<MetricsSet> {
438        Some(self.metrics.clone_inner())
439    }
440
441    /// The output statistics of a filtering operation can be estimated if the
442    /// predicate's selectivity value can be determined for the incoming data.
443    fn statistics(&self) -> Result<Statistics> {
444        self.partition_statistics(None)
445    }
446
447    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
448        let input_stats = self.input.partition_statistics(partition)?;
449        let schema = self.schema();
450        let stats = Self::statistics_helper(
451            &schema,
452            input_stats,
453            self.predicate(),
454            self.default_selectivity,
455        )?;
456        Ok(stats.project(self.projection.as_ref()))
457    }
458
459    fn cardinality_effect(&self) -> CardinalityEffect {
460        CardinalityEffect::LowerEqual
461    }
462
463    /// Tries to swap `projection` with its input (`filter`). If possible, performs
464    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
465    fn try_swapping_with_projection(
466        &self,
467        projection: &ProjectionExec,
468    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
469        // If the projection does not narrow the schema, we should not try to push it down:
470        if projection.expr().len() < projection.input().schema().fields().len() {
471            // Each column in the predicate expression must exist after the projection.
472            if let Some(new_predicate) =
473                update_expr(self.predicate(), projection.expr(), false)?
474            {
475                return FilterExec::try_new(
476                    new_predicate,
477                    make_with_child(projection, self.input())?,
478                )
479                .and_then(|e| {
480                    let selectivity = self.default_selectivity();
481                    e.with_default_selectivity(selectivity)
482                })
483                .map(|e| Some(Arc::new(e) as _));
484            }
485        }
486        try_embed_projection(projection, self)
487    }
488
489    fn gather_filters_for_pushdown(
490        &self,
491        phase: FilterPushdownPhase,
492        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
493        _config: &ConfigOptions,
494    ) -> Result<FilterDescription> {
495        if !matches!(phase, FilterPushdownPhase::Pre) {
496            // For non-pre phase, filters pass through unchanged
497            let filter_supports = parent_filters
498                .into_iter()
499                .map(PushedDownPredicate::supported)
500                .collect();
501
502            return Ok(FilterDescription::new().with_child(ChildFilterDescription {
503                parent_filters: filter_supports,
504                self_filters: vec![],
505            }));
506        }
507
508        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
509            .with_self_filters(
510                split_conjunction(&self.predicate)
511                    .into_iter()
512                    .cloned()
513                    .collect(),
514            );
515
516        Ok(FilterDescription::new().with_child(child))
517    }
518
519    fn handle_child_pushdown_result(
520        &self,
521        phase: FilterPushdownPhase,
522        child_pushdown_result: ChildPushdownResult,
523        _config: &ConfigOptions,
524    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
525        if !matches!(phase, FilterPushdownPhase::Pre) {
526            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
527        }
528        // We absorb any parent filters that were not handled by our children
529        let unsupported_parent_filters =
530            child_pushdown_result.parent_filters.iter().filter_map(|f| {
531                matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
532            });
533        let unsupported_self_filters = child_pushdown_result
534            .self_filters
535            .first()
536            .expect("we have exactly one child")
537            .iter()
538            .filter_map(|f| match f.discriminant {
539                PushedDown::Yes => None,
540                PushedDown::No => Some(&f.predicate),
541            })
542            .cloned();
543
544        let unhandled_filters = unsupported_parent_filters
545            .into_iter()
546            .chain(unsupported_self_filters)
547            .collect_vec();
548
549        // If we have unhandled filters, we need to create a new FilterExec
550        let filter_input = Arc::clone(self.input());
551        let new_predicate = conjunction(unhandled_filters);
552        let updated_node = if new_predicate.eq(&lit(true)) {
553            // FilterExec is no longer needed, but we may need to leave a projection in place
554            match self.projection() {
555                Some(projection_indices) => {
556                    let filter_child_schema = filter_input.schema();
557                    let proj_exprs = projection_indices
558                        .iter()
559                        .map(|p| {
560                            let field = filter_child_schema.field(*p).clone();
561                            ProjectionExpr {
562                                expr: Arc::new(Column::new(field.name(), *p))
563                                    as Arc<dyn PhysicalExpr>,
564                                alias: field.name().to_string(),
565                            }
566                        })
567                        .collect::<Vec<_>>();
568                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
569                        as Arc<dyn ExecutionPlan>)
570                }
571                None => {
572                    // No projection needed, just return the input
573                    Some(filter_input)
574                }
575            }
576        } else if new_predicate.eq(&self.predicate) {
577            // The new predicate is the same as our current predicate
578            None
579        } else {
580            // Create a new FilterExec with the new predicate
581            let new = FilterExec {
582                predicate: Arc::clone(&new_predicate),
583                input: Arc::clone(&filter_input),
584                metrics: self.metrics.clone(),
585                default_selectivity: self.default_selectivity,
586                cache: Self::compute_properties(
587                    &filter_input,
588                    &new_predicate,
589                    self.default_selectivity,
590                    self.projection.as_ref(),
591                )?,
592                projection: None,
593                batch_size: self.batch_size,
594                fetch: self.fetch,
595            };
596            Some(Arc::new(new) as _)
597        };
598
599        Ok(FilterPushdownPropagation {
600            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
601            updated_node,
602        })
603    }
604
605    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
606        Some(Arc::new(Self {
607            predicate: Arc::clone(&self.predicate),
608            input: Arc::clone(&self.input),
609            metrics: self.metrics.clone(),
610            default_selectivity: self.default_selectivity,
611            cache: self.cache.clone(),
612            projection: self.projection.clone(),
613            batch_size: self.batch_size,
614            fetch,
615        }))
616    }
617}
618
619impl EmbeddedProjection for FilterExec {
620    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
621        self.with_projection(projection)
622    }
623}
624
625/// This function ensures that all bounds in the `ExprBoundaries` vector are
626/// converted to closed bounds. If a lower/upper bound is initially open, it
627/// is adjusted by using the next/previous value for its data type to convert
628/// it into a closed bound.
629fn collect_new_statistics(
630    input_column_stats: &[ColumnStatistics],
631    analysis_boundaries: Vec<ExprBoundaries>,
632) -> Vec<ColumnStatistics> {
633    analysis_boundaries
634        .into_iter()
635        .enumerate()
636        .map(
637            |(
638                idx,
639                ExprBoundaries {
640                    interval,
641                    distinct_count,
642                    ..
643                },
644            )| {
645                let Some(interval) = interval else {
646                    // If the interval is `None`, we can say that there are no rows:
647                    return ColumnStatistics {
648                        null_count: Precision::Exact(0),
649                        max_value: Precision::Exact(ScalarValue::Null),
650                        min_value: Precision::Exact(ScalarValue::Null),
651                        sum_value: Precision::Exact(ScalarValue::Null),
652                        distinct_count: Precision::Exact(0),
653                        byte_size: input_column_stats[idx].byte_size,
654                    };
655                };
656                let (lower, upper) = interval.into_bounds();
657                let (min_value, max_value) = if lower.eq(&upper) {
658                    (Precision::Exact(lower), Precision::Exact(upper))
659                } else {
660                    (Precision::Inexact(lower), Precision::Inexact(upper))
661                };
662                ColumnStatistics {
663                    null_count: input_column_stats[idx].null_count.to_inexact(),
664                    max_value,
665                    min_value,
666                    sum_value: Precision::Absent,
667                    distinct_count: distinct_count.to_inexact(),
668                    byte_size: input_column_stats[idx].byte_size,
669                }
670            },
671        )
672        .collect()
673}
674
675/// The FilterExec streams wraps the input iterator and applies the predicate expression to
676/// determine which rows to include in its output batches
677struct FilterExecStream {
678    /// Output schema after the projection
679    schema: SchemaRef,
680    /// The expression to filter on. This expression must evaluate to a boolean value.
681    predicate: Arc<dyn PhysicalExpr>,
682    /// The input partition to filter.
683    input: SendableRecordBatchStream,
684    /// Runtime metrics recording
685    metrics: FilterExecMetrics,
686    /// The projection indices of the columns in the input schema
687    projection: Option<Vec<usize>>,
688    /// Batch coalescer to combine small batches
689    batch_coalescer: LimitedBatchCoalescer,
690}
691
692/// The metrics for `FilterExec`
693struct FilterExecMetrics {
694    /// Common metrics for most operators
695    baseline_metrics: BaselineMetrics,
696    /// Selectivity of the filter, calculated as output_rows / input_rows
697    selectivity: RatioMetrics,
698    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
699    // or modifying metrics comments
700}
701
702impl FilterExecMetrics {
703    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
704        Self {
705            baseline_metrics: BaselineMetrics::new(metrics, partition),
706            selectivity: MetricBuilder::new(metrics)
707                .with_type(MetricType::SUMMARY)
708                .ratio_metrics("selectivity", partition),
709        }
710    }
711}
712
713pub fn batch_filter(
714    batch: &RecordBatch,
715    predicate: &Arc<dyn PhysicalExpr>,
716) -> Result<RecordBatch> {
717    filter_and_project(batch, predicate, None)
718}
719
720fn filter_and_project(
721    batch: &RecordBatch,
722    predicate: &Arc<dyn PhysicalExpr>,
723    projection: Option<&Vec<usize>>,
724) -> Result<RecordBatch> {
725    predicate
726        .evaluate(batch)
727        .and_then(|v| v.into_array(batch.num_rows()))
728        .and_then(|array| {
729            Ok(match (as_boolean_array(&array), projection) {
730                // Apply filter array to record batch
731                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
732                (Ok(filter_array), Some(projection)) => {
733                    let projected_batch = batch.project(projection)?;
734                    filter_record_batch(&projected_batch, filter_array)?
735                }
736                (Err(_), _) => {
737                    return internal_err!(
738                        "Cannot create filter_array from non-boolean predicates"
739                    );
740                }
741            })
742        })
743}
744
745impl Stream for FilterExecStream {
746    type Item = Result<RecordBatch>;
747
748    fn poll_next(
749        mut self: Pin<&mut Self>,
750        cx: &mut Context<'_>,
751    ) -> Poll<Option<Self::Item>> {
752        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
753        loop {
754            // If there is a completed batch ready, return it
755            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
756                self.metrics.selectivity.add_part(batch.num_rows());
757                let poll = Poll::Ready(Some(Ok(batch)));
758                return self.metrics.baseline_metrics.record_poll(poll);
759            }
760
761            if self.batch_coalescer.is_finished() {
762                // If input is done and no batches are ready, return None to signal end of stream.
763                return Poll::Ready(None);
764            }
765
766            // Attempt to pull the next batch from the input stream.
767            match ready!(self.input.poll_next_unpin(cx)) {
768                None => {
769                    self.batch_coalescer.finish()?;
770                    // continue draining the coalescer
771                }
772                Some(Ok(batch)) => {
773                    let timer = elapsed_compute.timer();
774                    let status = self.predicate.as_ref()
775                        .evaluate(&batch)
776                        .and_then(|v| v.into_array(batch.num_rows()))
777                        .and_then(|array| {
778                            Ok(match self.projection {
779                                Some(ref projection) => {
780                                    let projected_batch = batch.project(projection)?;
781                                    (array, projected_batch)
782                                },
783                                None => (array, batch)
784                            })
785                        }).and_then(|(array, batch)| {
786                            match as_boolean_array(&array) {
787                                Ok(filter_array) => {
788                                    self.metrics.selectivity.add_total(batch.num_rows());
789                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
790                                    let batch = filter_record_batch(&batch, filter_array)?;
791                                    let state = self.batch_coalescer.push_batch(batch)?;
792                                    Ok(state)
793                                }
794                                Err(_) => {
795                                    internal_err!(
796                                        "Cannot create filter_array from non-boolean predicates"
797                                    )
798                                }
799                            }
800                        })?;
801                    timer.done();
802
803                    match status {
804                        PushBatchStatus::Continue => {
805                            // Keep pushing more batches
806                        }
807                        PushBatchStatus::LimitReached => {
808                            // limit was reached, so stop early
809                            self.batch_coalescer.finish()?;
810                            // continue draining the coalescer
811                        }
812                    }
813                }
814
815                // Error case
816                other => return Poll::Ready(other),
817            }
818        }
819    }
820
821    fn size_hint(&self) -> (usize, Option<usize>) {
822        // Same number of record batches
823        self.input.size_hint()
824    }
825}
826impl RecordBatchStream for FilterExecStream {
827    fn schema(&self) -> SchemaRef {
828        Arc::clone(&self.schema)
829    }
830}
831
832/// Return the equals Column-Pairs and Non-equals Column-Pairs
833#[deprecated(
834    since = "51.0.0",
835    note = "This function will be internal in the future"
836)]
837pub fn collect_columns_from_predicate(
838    predicate: &'_ Arc<dyn PhysicalExpr>,
839) -> EqualAndNonEqual<'_> {
840    collect_columns_from_predicate_inner(predicate)
841}
842
843fn collect_columns_from_predicate_inner(
844    predicate: &'_ Arc<dyn PhysicalExpr>,
845) -> EqualAndNonEqual<'_> {
846    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
847    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
848
849    let predicates = split_conjunction(predicate);
850    predicates.into_iter().for_each(|p| {
851        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
852            match binary.op() {
853                Operator::Eq => {
854                    eq_predicate_columns.push((binary.left(), binary.right()))
855                }
856                Operator::NotEq => {
857                    ne_predicate_columns.push((binary.left(), binary.right()))
858                }
859                _ => {}
860            }
861        }
862    });
863
864    (eq_predicate_columns, ne_predicate_columns)
865}
866
867/// Pair of `Arc<dyn PhysicalExpr>`s
868pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
869
870/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
871pub type EqualAndNonEqual<'a> =
872    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use crate::empty::EmptyExec;
878    use crate::expressions::*;
879    use crate::test;
880    use crate::test::exec::StatisticsExec;
881    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
882    use datafusion_common::ScalarValue;
883
884    #[tokio::test]
885    async fn collect_columns_predicates() -> Result<()> {
886        let schema = test::aggr_test_schema();
887        let predicate: Arc<dyn PhysicalExpr> = binary(
888            binary(
889                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
890                Operator::And,
891                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
892                &schema,
893            )?,
894            Operator::And,
895            binary(
896                binary(
897                    col("c2", &schema)?,
898                    Operator::Eq,
899                    col("c9", &schema)?,
900                    &schema,
901                )?,
902                Operator::And,
903                binary(
904                    col("c1", &schema)?,
905                    Operator::NotEq,
906                    col("c13", &schema)?,
907                    &schema,
908                )?,
909                &schema,
910            )?,
911            &schema,
912        )?;
913
914        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
915        assert_eq!(2, equal_pairs.len());
916        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
917        assert!(equal_pairs[0].1.eq(&lit(4u32)));
918
919        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
920        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
921
922        assert_eq!(1, ne_pairs.len());
923        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
924        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
925
926        Ok(())
927    }
928
929    #[tokio::test]
930    async fn test_filter_statistics_basic_expr() -> Result<()> {
931        // Table:
932        //      a: min=1, max=100
933        let bytes_per_row = 4;
934        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
935        let input = Arc::new(StatisticsExec::new(
936            Statistics {
937                num_rows: Precision::Inexact(100),
938                total_byte_size: Precision::Inexact(100 * bytes_per_row),
939                column_statistics: vec![ColumnStatistics {
940                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
941                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
942                    ..Default::default()
943                }],
944            },
945            schema.clone(),
946        ));
947
948        // a <= 25
949        let predicate: Arc<dyn PhysicalExpr> =
950            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
951
952        // WHERE a <= 25
953        let filter: Arc<dyn ExecutionPlan> =
954            Arc::new(FilterExec::try_new(predicate, input)?);
955
956        let statistics = filter.partition_statistics(None)?;
957        assert_eq!(statistics.num_rows, Precision::Inexact(25));
958        assert_eq!(
959            statistics.total_byte_size,
960            Precision::Inexact(25 * bytes_per_row)
961        );
962        assert_eq!(
963            statistics.column_statistics,
964            vec![ColumnStatistics {
965                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
966                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
967                ..Default::default()
968            }]
969        );
970
971        Ok(())
972    }
973
974    #[tokio::test]
975    async fn test_filter_statistics_column_level_nested() -> Result<()> {
976        // Table:
977        //      a: min=1, max=100
978        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
979        let input = Arc::new(StatisticsExec::new(
980            Statistics {
981                num_rows: Precision::Inexact(100),
982                column_statistics: vec![ColumnStatistics {
983                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
984                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
985                    ..Default::default()
986                }],
987                total_byte_size: Precision::Absent,
988            },
989            schema.clone(),
990        ));
991
992        // WHERE a <= 25
993        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
994            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
995            input,
996        )?);
997
998        // Nested filters (two separate physical plans, instead of AND chain in the expr)
999        // WHERE a >= 10
1000        // WHERE a <= 25
1001        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1002            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1003            sub_filter,
1004        )?);
1005
1006        let statistics = filter.partition_statistics(None)?;
1007        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1008        assert_eq!(
1009            statistics.column_statistics,
1010            vec![ColumnStatistics {
1011                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1012                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1013                ..Default::default()
1014            }]
1015        );
1016
1017        Ok(())
1018    }
1019
1020    #[tokio::test]
1021    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1022        // Table:
1023        //      a: min=1, max=100
1024        //      b: min=1, max=50
1025        let schema = Schema::new(vec![
1026            Field::new("a", DataType::Int32, false),
1027            Field::new("b", DataType::Int32, false),
1028        ]);
1029        let input = Arc::new(StatisticsExec::new(
1030            Statistics {
1031                num_rows: Precision::Inexact(100),
1032                column_statistics: vec![
1033                    ColumnStatistics {
1034                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1035                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1036                        ..Default::default()
1037                    },
1038                    ColumnStatistics {
1039                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1040                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1041                        ..Default::default()
1042                    },
1043                ],
1044                total_byte_size: Precision::Absent,
1045            },
1046            schema.clone(),
1047        ));
1048
1049        // WHERE a <= 25
1050        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1051            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1052            input,
1053        )?);
1054
1055        // WHERE b > 45
1056        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1057            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1058            a_lte_25,
1059        )?);
1060
1061        // WHERE a >= 10
1062        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1063            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1064            b_gt_5,
1065        )?);
1066        let statistics = filter.partition_statistics(None)?;
1067        // On a uniform distribution, only fifteen rows will satisfy the
1068        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1069        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1070        //
1071        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1072        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1073        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1074        assert_eq!(
1075            statistics.column_statistics,
1076            vec![
1077                ColumnStatistics {
1078                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1079                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1080                    ..Default::default()
1081                },
1082                ColumnStatistics {
1083                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1084                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1085                    ..Default::default()
1086                }
1087            ]
1088        );
1089
1090        Ok(())
1091    }
1092
1093    #[tokio::test]
1094    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1095        // Table:
1096        //      a: min=???, max=??? (missing)
1097        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1098        let input = Arc::new(StatisticsExec::new(
1099            Statistics::new_unknown(&schema),
1100            schema.clone(),
1101        ));
1102
1103        // a <= 25
1104        let predicate: Arc<dyn PhysicalExpr> =
1105            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1106
1107        // WHERE a <= 25
1108        let filter: Arc<dyn ExecutionPlan> =
1109            Arc::new(FilterExec::try_new(predicate, input)?);
1110
1111        let statistics = filter.partition_statistics(None)?;
1112        assert_eq!(statistics.num_rows, Precision::Absent);
1113
1114        Ok(())
1115    }
1116
1117    #[tokio::test]
1118    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1119        // Table:
1120        //      a: min=1, max=100
1121        //      b: min=1, max=3
1122        //      c: min=1000.0  max=1100.0
1123        let schema = Schema::new(vec![
1124            Field::new("a", DataType::Int32, false),
1125            Field::new("b", DataType::Int32, false),
1126            Field::new("c", DataType::Float32, false),
1127        ]);
1128        let input = Arc::new(StatisticsExec::new(
1129            Statistics {
1130                num_rows: Precision::Inexact(1000),
1131                total_byte_size: Precision::Inexact(4000),
1132                column_statistics: vec![
1133                    ColumnStatistics {
1134                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1135                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1136                        ..Default::default()
1137                    },
1138                    ColumnStatistics {
1139                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1140                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1141                        ..Default::default()
1142                    },
1143                    ColumnStatistics {
1144                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1145                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1146                        ..Default::default()
1147                    },
1148                ],
1149            },
1150            schema,
1151        ));
1152        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1153        let predicate = Arc::new(BinaryExpr::new(
1154            Arc::new(BinaryExpr::new(
1155                Arc::new(Column::new("a", 0)),
1156                Operator::LtEq,
1157                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1158            )),
1159            Operator::And,
1160            Arc::new(BinaryExpr::new(
1161                Arc::new(BinaryExpr::new(
1162                    Arc::new(Column::new("b", 1)),
1163                    Operator::Eq,
1164                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1165                )),
1166                Operator::And,
1167                Arc::new(BinaryExpr::new(
1168                    Arc::new(BinaryExpr::new(
1169                        Arc::new(Column::new("c", 2)),
1170                        Operator::LtEq,
1171                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1172                    )),
1173                    Operator::And,
1174                    Arc::new(BinaryExpr::new(
1175                        Arc::new(Column::new("a", 0)),
1176                        Operator::Gt,
1177                        Arc::new(Column::new("b", 1)),
1178                    )),
1179                )),
1180            )),
1181        ));
1182        let filter: Arc<dyn ExecutionPlan> =
1183            Arc::new(FilterExec::try_new(predicate, input)?);
1184        let statistics = filter.partition_statistics(None)?;
1185        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1186        // num_rows after ceil => 133.0... => 134
1187        // total_byte_size after ceil => 532.0... => 533
1188        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1189        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1190        let exp_col_stats = vec![
1191            ColumnStatistics {
1192                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1193                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1194                ..Default::default()
1195            },
1196            ColumnStatistics {
1197                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1198                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1199                ..Default::default()
1200            },
1201            ColumnStatistics {
1202                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1203                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1204                ..Default::default()
1205            },
1206        ];
1207        let _ = exp_col_stats
1208            .into_iter()
1209            .zip(statistics.column_statistics)
1210            .map(|(expected, actual)| {
1211                if let Some(val) = actual.min_value.get_value() {
1212                    if val.data_type().is_floating() {
1213                        // Windows rounds arithmetic operation results differently for floating point numbers.
1214                        // Therefore, we check if the actual values are in an epsilon range.
1215                        let actual_min = actual.min_value.get_value().unwrap();
1216                        let actual_max = actual.max_value.get_value().unwrap();
1217                        let expected_min = expected.min_value.get_value().unwrap();
1218                        let expected_max = expected.max_value.get_value().unwrap();
1219                        let eps = ScalarValue::Float32(Some(1e-6));
1220
1221                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1222                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1223
1224                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1225                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1226                    } else {
1227                        assert_eq!(actual, expected);
1228                    }
1229                } else {
1230                    assert_eq!(actual, expected);
1231                }
1232            });
1233
1234        Ok(())
1235    }
1236
1237    #[tokio::test]
1238    async fn test_filter_statistics_full_selective() -> Result<()> {
1239        // Table:
1240        //      a: min=1, max=100
1241        //      b: min=1, max=3
1242        let schema = Schema::new(vec![
1243            Field::new("a", DataType::Int32, false),
1244            Field::new("b", DataType::Int32, false),
1245        ]);
1246        let input = Arc::new(StatisticsExec::new(
1247            Statistics {
1248                num_rows: Precision::Inexact(1000),
1249                total_byte_size: Precision::Inexact(4000),
1250                column_statistics: vec![
1251                    ColumnStatistics {
1252                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1253                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1254                        ..Default::default()
1255                    },
1256                    ColumnStatistics {
1257                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1258                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1259                        ..Default::default()
1260                    },
1261                ],
1262            },
1263            schema,
1264        ));
1265        // WHERE a<200 AND 1<=b
1266        let predicate = Arc::new(BinaryExpr::new(
1267            Arc::new(BinaryExpr::new(
1268                Arc::new(Column::new("a", 0)),
1269                Operator::Lt,
1270                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1271            )),
1272            Operator::And,
1273            Arc::new(BinaryExpr::new(
1274                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1275                Operator::LtEq,
1276                Arc::new(Column::new("b", 1)),
1277            )),
1278        ));
1279        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1280        let expected = input.partition_statistics(None)?.column_statistics;
1281        let filter: Arc<dyn ExecutionPlan> =
1282            Arc::new(FilterExec::try_new(predicate, input)?);
1283        let statistics = filter.partition_statistics(None)?;
1284
1285        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1286        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1287        assert_eq!(statistics.column_statistics, expected);
1288
1289        Ok(())
1290    }
1291
1292    #[tokio::test]
1293    async fn test_filter_statistics_zero_selective() -> Result<()> {
1294        // Table:
1295        //      a: min=1, max=100
1296        //      b: min=1, max=3
1297        let schema = Schema::new(vec![
1298            Field::new("a", DataType::Int32, false),
1299            Field::new("b", DataType::Int32, false),
1300        ]);
1301        let input = Arc::new(StatisticsExec::new(
1302            Statistics {
1303                num_rows: Precision::Inexact(1000),
1304                total_byte_size: Precision::Inexact(4000),
1305                column_statistics: vec![
1306                    ColumnStatistics {
1307                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1308                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1309                        ..Default::default()
1310                    },
1311                    ColumnStatistics {
1312                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1313                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1314                        ..Default::default()
1315                    },
1316                ],
1317            },
1318            schema,
1319        ));
1320        // WHERE a>200 AND 1<=b
1321        let predicate = Arc::new(BinaryExpr::new(
1322            Arc::new(BinaryExpr::new(
1323                Arc::new(Column::new("a", 0)),
1324                Operator::Gt,
1325                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1326            )),
1327            Operator::And,
1328            Arc::new(BinaryExpr::new(
1329                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1330                Operator::LtEq,
1331                Arc::new(Column::new("b", 1)),
1332            )),
1333        ));
1334        let filter: Arc<dyn ExecutionPlan> =
1335            Arc::new(FilterExec::try_new(predicate, input)?);
1336        let statistics = filter.partition_statistics(None)?;
1337
1338        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1339        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1340        assert_eq!(
1341            statistics.column_statistics,
1342            vec![
1343                ColumnStatistics {
1344                    min_value: Precision::Exact(ScalarValue::Null),
1345                    max_value: Precision::Exact(ScalarValue::Null),
1346                    sum_value: Precision::Exact(ScalarValue::Null),
1347                    distinct_count: Precision::Exact(0),
1348                    null_count: Precision::Exact(0),
1349                    byte_size: Precision::Absent,
1350                },
1351                ColumnStatistics {
1352                    min_value: Precision::Exact(ScalarValue::Null),
1353                    max_value: Precision::Exact(ScalarValue::Null),
1354                    sum_value: Precision::Exact(ScalarValue::Null),
1355                    distinct_count: Precision::Exact(0),
1356                    null_count: Precision::Exact(0),
1357                    byte_size: Precision::Absent,
1358                },
1359            ]
1360        );
1361
1362        Ok(())
1363    }
1364
1365    #[tokio::test]
1366    async fn test_filter_statistics_more_inputs() -> Result<()> {
1367        let schema = Schema::new(vec![
1368            Field::new("a", DataType::Int32, false),
1369            Field::new("b", DataType::Int32, false),
1370        ]);
1371        let input = Arc::new(StatisticsExec::new(
1372            Statistics {
1373                num_rows: Precision::Inexact(1000),
1374                total_byte_size: Precision::Inexact(4000),
1375                column_statistics: vec![
1376                    ColumnStatistics {
1377                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1378                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1379                        ..Default::default()
1380                    },
1381                    ColumnStatistics {
1382                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1383                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1384                        ..Default::default()
1385                    },
1386                ],
1387            },
1388            schema,
1389        ));
1390        // WHERE a<50
1391        let predicate = Arc::new(BinaryExpr::new(
1392            Arc::new(Column::new("a", 0)),
1393            Operator::Lt,
1394            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1395        ));
1396        let filter: Arc<dyn ExecutionPlan> =
1397            Arc::new(FilterExec::try_new(predicate, input)?);
1398        let statistics = filter.partition_statistics(None)?;
1399
1400        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1401        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1402        assert_eq!(
1403            statistics.column_statistics,
1404            vec![
1405                ColumnStatistics {
1406                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1407                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1408                    ..Default::default()
1409                },
1410                ColumnStatistics {
1411                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1412                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1413                    ..Default::default()
1414                },
1415            ]
1416        );
1417
1418        Ok(())
1419    }
1420
1421    #[tokio::test]
1422    async fn test_empty_input_statistics() -> Result<()> {
1423        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1424        let input = Arc::new(StatisticsExec::new(
1425            Statistics::new_unknown(&schema),
1426            schema,
1427        ));
1428        // WHERE a <= 10 AND 0 <= a - 5
1429        let predicate = Arc::new(BinaryExpr::new(
1430            Arc::new(BinaryExpr::new(
1431                Arc::new(Column::new("a", 0)),
1432                Operator::LtEq,
1433                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1434            )),
1435            Operator::And,
1436            Arc::new(BinaryExpr::new(
1437                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1438                Operator::LtEq,
1439                Arc::new(BinaryExpr::new(
1440                    Arc::new(Column::new("a", 0)),
1441                    Operator::Minus,
1442                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1443                )),
1444            )),
1445        ));
1446        let filter: Arc<dyn ExecutionPlan> =
1447            Arc::new(FilterExec::try_new(predicate, input)?);
1448        let filter_statistics = filter.partition_statistics(None)?;
1449
1450        let expected_filter_statistics = Statistics {
1451            num_rows: Precision::Absent,
1452            total_byte_size: Precision::Absent,
1453            column_statistics: vec![ColumnStatistics {
1454                null_count: Precision::Absent,
1455                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1456                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1457                sum_value: Precision::Absent,
1458                distinct_count: Precision::Absent,
1459                byte_size: Precision::Absent,
1460            }],
1461        };
1462
1463        assert_eq!(filter_statistics, expected_filter_statistics);
1464
1465        Ok(())
1466    }
1467
1468    #[tokio::test]
1469    async fn test_statistics_with_constant_column() -> Result<()> {
1470        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1471        let input = Arc::new(StatisticsExec::new(
1472            Statistics::new_unknown(&schema),
1473            schema,
1474        ));
1475        // WHERE a = 10
1476        let predicate = Arc::new(BinaryExpr::new(
1477            Arc::new(Column::new("a", 0)),
1478            Operator::Eq,
1479            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1480        ));
1481        let filter: Arc<dyn ExecutionPlan> =
1482            Arc::new(FilterExec::try_new(predicate, input)?);
1483        let filter_statistics = filter.partition_statistics(None)?;
1484        // First column is "a", and it is a column with only one value after the filter.
1485        assert!(filter_statistics.column_statistics[0].is_singleton());
1486
1487        Ok(())
1488    }
1489
1490    #[tokio::test]
1491    async fn test_validation_filter_selectivity() -> Result<()> {
1492        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1493        let input = Arc::new(StatisticsExec::new(
1494            Statistics::new_unknown(&schema),
1495            schema,
1496        ));
1497        // WHERE a = 10
1498        let predicate = Arc::new(BinaryExpr::new(
1499            Arc::new(Column::new("a", 0)),
1500            Operator::Eq,
1501            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1502        ));
1503        let filter = FilterExec::try_new(predicate, input)?;
1504        assert!(filter.with_default_selectivity(120).is_err());
1505        Ok(())
1506    }
1507
1508    #[tokio::test]
1509    async fn test_custom_filter_selectivity() -> Result<()> {
1510        // Need a decimal to trigger inexact selectivity
1511        let schema =
1512            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1513        let input = Arc::new(StatisticsExec::new(
1514            Statistics {
1515                num_rows: Precision::Inexact(1000),
1516                total_byte_size: Precision::Inexact(4000),
1517                column_statistics: vec![ColumnStatistics {
1518                    ..Default::default()
1519                }],
1520            },
1521            schema,
1522        ));
1523        // WHERE a = 10
1524        let predicate = Arc::new(BinaryExpr::new(
1525            Arc::new(Column::new("a", 0)),
1526            Operator::Eq,
1527            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1528        ));
1529        let filter = FilterExec::try_new(predicate, input)?;
1530        let statistics = filter.partition_statistics(None)?;
1531        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1532        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1533        let filter = filter.with_default_selectivity(40)?;
1534        let statistics = filter.partition_statistics(None)?;
1535        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1536        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1537        Ok(())
1538    }
1539
1540    #[test]
1541    fn test_equivalence_properties_union_type() -> Result<()> {
1542        let union_type = DataType::Union(
1543            UnionFields::new(
1544                vec![0, 1],
1545                vec![
1546                    Field::new("f1", DataType::Int32, true),
1547                    Field::new("f2", DataType::Utf8, true),
1548                ],
1549            ),
1550            UnionMode::Sparse,
1551        );
1552
1553        let schema = Arc::new(Schema::new(vec![
1554            Field::new("c1", DataType::Int32, true),
1555            Field::new("c2", union_type, true),
1556        ]));
1557
1558        let exec = FilterExec::try_new(
1559            binary(
1560                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1561                Operator::And,
1562                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1563                &schema,
1564            )?,
1565            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1566        )?;
1567
1568        exec.partition_statistics(None).unwrap();
1569
1570        Ok(())
1571    }
1572}