Skip to main content

datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34    FilterPushdownPropagation, PushedDown,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39    try_embed_projection, update_expr,
40};
41use crate::{
42    DisplayFormatType, ExecutionPlan,
43    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
61use datafusion_physical_expr::{
62    AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
63    conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
74/// include in its output batches.
75#[derive(Debug, Clone)]
76pub struct FilterExec {
77    /// The expression to filter on. This expression must evaluate to a boolean value.
78    predicate: Arc<dyn PhysicalExpr>,
79    /// The input plan
80    input: Arc<dyn ExecutionPlan>,
81    /// Execution metrics
82    metrics: ExecutionPlanMetricsSet,
83    /// Selectivity for statistics. 0 = no rows, 100 = all rows
84    default_selectivity: u8,
85    /// Properties equivalence properties, partitioning, etc.
86    cache: PlanProperties,
87    /// The projection indices of the columns in the output schema of join
88    projection: Option<Vec<usize>>,
89    /// Target batch size for output batches
90    batch_size: usize,
91    /// Number of rows to fetch
92    fetch: Option<usize>,
93}
94
95impl FilterExec {
96    /// Create a FilterExec on an input
97    #[expect(clippy::needless_pass_by_value)]
98    pub fn try_new(
99        predicate: Arc<dyn PhysicalExpr>,
100        input: Arc<dyn ExecutionPlan>,
101    ) -> Result<Self> {
102        match predicate.data_type(input.schema().as_ref())? {
103            DataType::Boolean => {
104                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105                let cache = Self::compute_properties(
106                    &input,
107                    &predicate,
108                    default_selectivity,
109                    None,
110                )?;
111                Ok(Self {
112                    predicate,
113                    input: Arc::clone(&input),
114                    metrics: ExecutionPlanMetricsSet::new(),
115                    default_selectivity,
116                    cache,
117                    projection: None,
118                    batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119                    fetch: None,
120                })
121            }
122            other => {
123                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124            }
125        }
126    }
127
128    pub fn with_default_selectivity(
129        mut self,
130        default_selectivity: u8,
131    ) -> Result<Self, DataFusionError> {
132        if default_selectivity > 100 {
133            return plan_err!(
134                "Default filter selectivity value needs to be less than or equal to 100"
135            );
136        }
137        self.default_selectivity = default_selectivity;
138        Ok(self)
139    }
140
141    /// Return new instance of [FilterExec] with the given projection.
142    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143        //  Check if the projection is valid
144        can_project(&self.schema(), projection.as_ref())?;
145
146        let projection = match projection {
147            Some(projection) => match &self.projection {
148                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149                None => Some(projection),
150            },
151            None => None,
152        };
153
154        let cache = Self::compute_properties(
155            &self.input,
156            &self.predicate,
157            self.default_selectivity,
158            projection.as_ref(),
159        )?;
160        Ok(Self {
161            predicate: Arc::clone(&self.predicate),
162            input: Arc::clone(&self.input),
163            metrics: self.metrics.clone(),
164            default_selectivity: self.default_selectivity,
165            cache,
166            projection,
167            batch_size: self.batch_size,
168            fetch: self.fetch,
169        })
170    }
171
172    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173        Ok(Self {
174            predicate: Arc::clone(&self.predicate),
175            input: Arc::clone(&self.input),
176            metrics: self.metrics.clone(),
177            default_selectivity: self.default_selectivity,
178            cache: self.cache.clone(),
179            projection: self.projection.clone(),
180            batch_size,
181            fetch: self.fetch,
182        })
183    }
184
185    /// The expression to filter on. This expression must evaluate to a boolean value.
186    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187        &self.predicate
188    }
189
190    /// The input plan
191    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192        &self.input
193    }
194
195    /// The default selectivity
196    pub fn default_selectivity(&self) -> u8 {
197        self.default_selectivity
198    }
199
200    /// Projection
201    pub fn projection(&self) -> Option<&Vec<usize>> {
202        self.projection.as_ref()
203    }
204
205    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
206    fn statistics_helper(
207        schema: &SchemaRef,
208        input_stats: Statistics,
209        predicate: &Arc<dyn PhysicalExpr>,
210        default_selectivity: u8,
211    ) -> Result<Statistics> {
212        if !check_support(predicate, schema) {
213            let selectivity = default_selectivity as f64 / 100.0;
214            let mut stats = input_stats.to_inexact();
215            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216            stats.total_byte_size = stats
217                .total_byte_size
218                .with_estimated_selectivity(selectivity);
219            return Ok(stats);
220        }
221
222        let num_rows = input_stats.num_rows;
223        let total_byte_size = input_stats.total_byte_size;
224        let input_analysis_ctx =
225            AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229        // Estimate (inexact) selectivity of predicate
230        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231        let num_rows = num_rows.with_estimated_selectivity(selectivity);
232        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234        let column_statistics = collect_new_statistics(
235            &input_stats.column_statistics,
236            analysis_ctx.boundaries,
237        );
238        Ok(Statistics {
239            num_rows,
240            total_byte_size,
241            column_statistics,
242        })
243    }
244
245    fn extend_constants(
246        input: &Arc<dyn ExecutionPlan>,
247        predicate: &Arc<dyn PhysicalExpr>,
248    ) -> Vec<ConstExpr> {
249        let mut res_constants = Vec::new();
250        let input_eqs = input.equivalence_properties();
251
252        let conjunctions = split_conjunction(predicate);
253        for conjunction in conjunctions {
254            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
255                && binary.op() == &Operator::Eq
256            {
257                // Filter evaluates to single value for all partitions
258                if input_eqs.is_expr_constant(binary.left()).is_some() {
259                    let across = input_eqs
260                        .is_expr_constant(binary.right())
261                        .unwrap_or_default();
262                    res_constants
263                        .push(ConstExpr::new(Arc::clone(binary.right()), across));
264                } else if input_eqs.is_expr_constant(binary.right()).is_some() {
265                    let across = input_eqs
266                        .is_expr_constant(binary.left())
267                        .unwrap_or_default();
268                    res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
269                }
270            }
271        }
272        res_constants
273    }
274    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
275    fn compute_properties(
276        input: &Arc<dyn ExecutionPlan>,
277        predicate: &Arc<dyn PhysicalExpr>,
278        default_selectivity: u8,
279        projection: Option<&Vec<usize>>,
280    ) -> Result<PlanProperties> {
281        // Combine the equal predicates with the input equivalence properties
282        // to construct the equivalence properties:
283        let schema = input.schema();
284        let stats = Self::statistics_helper(
285            &schema,
286            input.partition_statistics(None)?,
287            predicate,
288            default_selectivity,
289        )?;
290        let mut eq_properties = input.equivalence_properties().clone();
291        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
292        for (lhs, rhs) in equal_pairs {
293            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
294        }
295        // Add the columns that have only one viable value (singleton) after
296        // filtering to constants.
297        let constants = collect_columns(predicate)
298            .into_iter()
299            .filter(|column| stats.column_statistics[column.index()].is_singleton())
300            .map(|column| {
301                let value = stats.column_statistics[column.index()]
302                    .min_value
303                    .get_value();
304                let expr = Arc::new(column) as _;
305                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
306            });
307        // This is for statistics
308        eq_properties.add_constants(constants)?;
309        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
310        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
311        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
312
313        let mut output_partitioning = input.output_partitioning().clone();
314        // If contains projection, update the PlanProperties.
315        if let Some(projection) = projection {
316            let schema = eq_properties.schema();
317            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
318            let out_schema = project_schema(schema, Some(projection))?;
319            output_partitioning =
320                output_partitioning.project(&projection_mapping, &eq_properties);
321            eq_properties = eq_properties.project(&projection_mapping, out_schema);
322        }
323
324        Ok(PlanProperties::new(
325            eq_properties,
326            output_partitioning,
327            input.pipeline_behavior(),
328            input.boundedness(),
329        ))
330    }
331}
332
333impl DisplayAs for FilterExec {
334    fn fmt_as(
335        &self,
336        t: DisplayFormatType,
337        f: &mut std::fmt::Formatter,
338    ) -> std::fmt::Result {
339        match t {
340            DisplayFormatType::Default | DisplayFormatType::Verbose => {
341                let display_projections = if let Some(projection) =
342                    self.projection.as_ref()
343                {
344                    format!(
345                        ", projection=[{}]",
346                        projection
347                            .iter()
348                            .map(|index| format!(
349                                "{}@{}",
350                                self.input.schema().fields().get(*index).unwrap().name(),
351                                index
352                            ))
353                            .collect::<Vec<_>>()
354                            .join(", ")
355                    )
356                } else {
357                    "".to_string()
358                };
359                let fetch = self
360                    .fetch
361                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
362                write!(
363                    f,
364                    "FilterExec: {}{}{}",
365                    self.predicate, display_projections, fetch
366                )
367            }
368            DisplayFormatType::TreeRender => {
369                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
370            }
371        }
372    }
373}
374
375impl ExecutionPlan for FilterExec {
376    fn name(&self) -> &'static str {
377        "FilterExec"
378    }
379
380    /// Return a reference to Any that can be used for downcasting
381    fn as_any(&self) -> &dyn Any {
382        self
383    }
384
385    fn properties(&self) -> &PlanProperties {
386        &self.cache
387    }
388
389    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
390        vec![&self.input]
391    }
392
393    fn maintains_input_order(&self) -> Vec<bool> {
394        // Tell optimizer this operator doesn't reorder its input
395        vec![true]
396    }
397
398    fn with_new_children(
399        self: Arc<Self>,
400        mut children: Vec<Arc<dyn ExecutionPlan>>,
401    ) -> Result<Arc<dyn ExecutionPlan>> {
402        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
403            .and_then(|e| {
404                let selectivity = e.default_selectivity();
405                e.with_default_selectivity(selectivity)
406            })
407            .and_then(|e| e.with_projection(self.projection().cloned()))
408            .map(|e| e.with_fetch(self.fetch).unwrap())
409    }
410
411    fn execute(
412        &self,
413        partition: usize,
414        context: Arc<TaskContext>,
415    ) -> Result<SendableRecordBatchStream> {
416        trace!(
417            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
418            partition,
419            context.session_id(),
420            context.task_id()
421        );
422        let metrics = FilterExecMetrics::new(&self.metrics, partition);
423        Ok(Box::pin(FilterExecStream {
424            schema: self.schema(),
425            predicate: Arc::clone(&self.predicate),
426            input: self.input.execute(partition, context)?,
427            metrics,
428            projection: self.projection.clone(),
429            batch_coalescer: LimitedBatchCoalescer::new(
430                self.schema(),
431                self.batch_size,
432                self.fetch,
433            ),
434        }))
435    }
436
437    fn metrics(&self) -> Option<MetricsSet> {
438        Some(self.metrics.clone_inner())
439    }
440
441    /// The output statistics of a filtering operation can be estimated if the
442    /// predicate's selectivity value can be determined for the incoming data.
443    fn statistics(&self) -> Result<Statistics> {
444        self.partition_statistics(None)
445    }
446
447    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
448        let input_stats = self.input.partition_statistics(partition)?;
449        let schema = self.schema();
450        let stats = Self::statistics_helper(
451            &schema,
452            input_stats,
453            self.predicate(),
454            self.default_selectivity,
455        )?;
456        Ok(stats.project(self.projection.as_ref()))
457    }
458
459    fn cardinality_effect(&self) -> CardinalityEffect {
460        CardinalityEffect::LowerEqual
461    }
462
463    /// Tries to swap `projection` with its input (`filter`). If possible, performs
464    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
465    fn try_swapping_with_projection(
466        &self,
467        projection: &ProjectionExec,
468    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
469        // If the projection does not narrow the schema, we should not try to push it down:
470        if projection.expr().len() < projection.input().schema().fields().len() {
471            // Each column in the predicate expression must exist after the projection.
472            if let Some(new_predicate) =
473                update_expr(self.predicate(), projection.expr(), false)?
474            {
475                return FilterExec::try_new(
476                    new_predicate,
477                    make_with_child(projection, self.input())?,
478                )
479                .and_then(|e| {
480                    let selectivity = self.default_selectivity();
481                    e.with_default_selectivity(selectivity)
482                })
483                .map(|e| Some(Arc::new(e) as _));
484            }
485        }
486        try_embed_projection(projection, self)
487    }
488
489    fn gather_filters_for_pushdown(
490        &self,
491        phase: FilterPushdownPhase,
492        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
493        _config: &ConfigOptions,
494    ) -> Result<FilterDescription> {
495        if !matches!(phase, FilterPushdownPhase::Pre) {
496            // For non-pre phase, filters pass through unchanged
497            let child =
498                ChildFilterDescription::from_child(&parent_filters, self.input())?;
499            return Ok(FilterDescription::new().with_child(child));
500        }
501
502        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
503            .with_self_filters(
504                split_conjunction(&self.predicate)
505                    .into_iter()
506                    .cloned()
507                    .collect(),
508            );
509
510        Ok(FilterDescription::new().with_child(child))
511    }
512
513    fn handle_child_pushdown_result(
514        &self,
515        phase: FilterPushdownPhase,
516        child_pushdown_result: ChildPushdownResult,
517        _config: &ConfigOptions,
518    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
519        if !matches!(phase, FilterPushdownPhase::Pre) {
520            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
521        }
522        // We absorb any parent filters that were not handled by our children
523        let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
524            child_pushdown_result
525                .parent_filters
526                .iter()
527                .filter_map(|f| {
528                    matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
529                })
530                .collect();
531
532        // If this FilterExec has a projection, the unsupported parent filters
533        // are in the output schema (after projection) coordinates. We need to
534        // remap them to the input schema coordinates before combining with self filters.
535        if self.projection.is_some() {
536            let input_schema = self.input().schema();
537            unsupported_parent_filters = unsupported_parent_filters
538                .into_iter()
539                .map(|expr| reassign_expr_columns(expr, &input_schema))
540                .collect::<Result<Vec<_>>>()?;
541        }
542
543        let unsupported_self_filters = child_pushdown_result
544            .self_filters
545            .first()
546            .expect("we have exactly one child")
547            .iter()
548            .filter_map(|f| match f.discriminant {
549                PushedDown::Yes => None,
550                PushedDown::No => Some(&f.predicate),
551            })
552            .cloned();
553
554        let unhandled_filters = unsupported_parent_filters
555            .into_iter()
556            .chain(unsupported_self_filters)
557            .collect_vec();
558
559        // If we have unhandled filters, we need to create a new FilterExec
560        let filter_input = Arc::clone(self.input());
561        let new_predicate = conjunction(unhandled_filters);
562        let updated_node = if new_predicate.eq(&lit(true)) {
563            // FilterExec is no longer needed, but we may need to leave a projection in place
564            match self.projection() {
565                Some(projection_indices) => {
566                    let filter_child_schema = filter_input.schema();
567                    let proj_exprs = projection_indices
568                        .iter()
569                        .map(|p| {
570                            let field = filter_child_schema.field(*p).clone();
571                            ProjectionExpr {
572                                expr: Arc::new(Column::new(field.name(), *p))
573                                    as Arc<dyn PhysicalExpr>,
574                                alias: field.name().to_string(),
575                            }
576                        })
577                        .collect::<Vec<_>>();
578                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
579                        as Arc<dyn ExecutionPlan>)
580                }
581                None => {
582                    // No projection needed, just return the input
583                    Some(filter_input)
584                }
585            }
586        } else if new_predicate.eq(&self.predicate) {
587            // The new predicate is the same as our current predicate
588            None
589        } else {
590            // Create a new FilterExec with the new predicate, preserving the projection
591            let new = FilterExec {
592                predicate: Arc::clone(&new_predicate),
593                input: Arc::clone(&filter_input),
594                metrics: self.metrics.clone(),
595                default_selectivity: self.default_selectivity,
596                cache: Self::compute_properties(
597                    &filter_input,
598                    &new_predicate,
599                    self.default_selectivity,
600                    self.projection.as_ref(),
601                )?,
602                projection: self.projection.clone(),
603                batch_size: self.batch_size,
604                fetch: self.fetch,
605            };
606            Some(Arc::new(new) as _)
607        };
608
609        Ok(FilterPushdownPropagation {
610            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
611            updated_node,
612        })
613    }
614
615    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
616        Some(Arc::new(Self {
617            predicate: Arc::clone(&self.predicate),
618            input: Arc::clone(&self.input),
619            metrics: self.metrics.clone(),
620            default_selectivity: self.default_selectivity,
621            cache: self.cache.clone(),
622            projection: self.projection.clone(),
623            batch_size: self.batch_size,
624            fetch,
625        }))
626    }
627}
628
629impl EmbeddedProjection for FilterExec {
630    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
631        self.with_projection(projection)
632    }
633}
634
635/// This function ensures that all bounds in the `ExprBoundaries` vector are
636/// converted to closed bounds. If a lower/upper bound is initially open, it
637/// is adjusted by using the next/previous value for its data type to convert
638/// it into a closed bound.
639fn collect_new_statistics(
640    input_column_stats: &[ColumnStatistics],
641    analysis_boundaries: Vec<ExprBoundaries>,
642) -> Vec<ColumnStatistics> {
643    analysis_boundaries
644        .into_iter()
645        .enumerate()
646        .map(
647            |(
648                idx,
649                ExprBoundaries {
650                    interval,
651                    distinct_count,
652                    ..
653                },
654            )| {
655                let Some(interval) = interval else {
656                    // If the interval is `None`, we can say that there are no rows:
657                    return ColumnStatistics {
658                        null_count: Precision::Exact(0),
659                        max_value: Precision::Exact(ScalarValue::Null),
660                        min_value: Precision::Exact(ScalarValue::Null),
661                        sum_value: Precision::Exact(ScalarValue::Null),
662                        distinct_count: Precision::Exact(0),
663                        byte_size: input_column_stats[idx].byte_size,
664                    };
665                };
666                let (lower, upper) = interval.into_bounds();
667                let (min_value, max_value) = if lower.eq(&upper) {
668                    (Precision::Exact(lower), Precision::Exact(upper))
669                } else {
670                    (Precision::Inexact(lower), Precision::Inexact(upper))
671                };
672                ColumnStatistics {
673                    null_count: input_column_stats[idx].null_count.to_inexact(),
674                    max_value,
675                    min_value,
676                    sum_value: Precision::Absent,
677                    distinct_count: distinct_count.to_inexact(),
678                    byte_size: input_column_stats[idx].byte_size,
679                }
680            },
681        )
682        .collect()
683}
684
685/// The FilterExec streams wraps the input iterator and applies the predicate expression to
686/// determine which rows to include in its output batches
687struct FilterExecStream {
688    /// Output schema after the projection
689    schema: SchemaRef,
690    /// The expression to filter on. This expression must evaluate to a boolean value.
691    predicate: Arc<dyn PhysicalExpr>,
692    /// The input partition to filter.
693    input: SendableRecordBatchStream,
694    /// Runtime metrics recording
695    metrics: FilterExecMetrics,
696    /// The projection indices of the columns in the input schema
697    projection: Option<Vec<usize>>,
698    /// Batch coalescer to combine small batches
699    batch_coalescer: LimitedBatchCoalescer,
700}
701
702/// The metrics for `FilterExec`
703struct FilterExecMetrics {
704    /// Common metrics for most operators
705    baseline_metrics: BaselineMetrics,
706    /// Selectivity of the filter, calculated as output_rows / input_rows
707    selectivity: RatioMetrics,
708    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
709    // or modifying metrics comments
710}
711
712impl FilterExecMetrics {
713    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
714        Self {
715            baseline_metrics: BaselineMetrics::new(metrics, partition),
716            selectivity: MetricBuilder::new(metrics)
717                .with_type(MetricType::SUMMARY)
718                .ratio_metrics("selectivity", partition),
719        }
720    }
721}
722
723pub fn batch_filter(
724    batch: &RecordBatch,
725    predicate: &Arc<dyn PhysicalExpr>,
726) -> Result<RecordBatch> {
727    filter_and_project(batch, predicate, None)
728}
729
730fn filter_and_project(
731    batch: &RecordBatch,
732    predicate: &Arc<dyn PhysicalExpr>,
733    projection: Option<&Vec<usize>>,
734) -> Result<RecordBatch> {
735    predicate
736        .evaluate(batch)
737        .and_then(|v| v.into_array(batch.num_rows()))
738        .and_then(|array| {
739            Ok(match (as_boolean_array(&array), projection) {
740                // Apply filter array to record batch
741                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
742                (Ok(filter_array), Some(projection)) => {
743                    let projected_batch = batch.project(projection)?;
744                    filter_record_batch(&projected_batch, filter_array)?
745                }
746                (Err(_), _) => {
747                    return internal_err!(
748                        "Cannot create filter_array from non-boolean predicates"
749                    );
750                }
751            })
752        })
753}
754
755impl Stream for FilterExecStream {
756    type Item = Result<RecordBatch>;
757
758    fn poll_next(
759        mut self: Pin<&mut Self>,
760        cx: &mut Context<'_>,
761    ) -> Poll<Option<Self::Item>> {
762        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
763        loop {
764            // If there is a completed batch ready, return it
765            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
766                self.metrics.selectivity.add_part(batch.num_rows());
767                let poll = Poll::Ready(Some(Ok(batch)));
768                return self.metrics.baseline_metrics.record_poll(poll);
769            }
770
771            if self.batch_coalescer.is_finished() {
772                // If input is done and no batches are ready, return None to signal end of stream.
773                return Poll::Ready(None);
774            }
775
776            // Attempt to pull the next batch from the input stream.
777            match ready!(self.input.poll_next_unpin(cx)) {
778                None => {
779                    self.batch_coalescer.finish()?;
780                    // continue draining the coalescer
781                }
782                Some(Ok(batch)) => {
783                    let timer = elapsed_compute.timer();
784                    let status = self.predicate.as_ref()
785                        .evaluate(&batch)
786                        .and_then(|v| v.into_array(batch.num_rows()))
787                        .and_then(|array| {
788                            Ok(match self.projection {
789                                Some(ref projection) => {
790                                    let projected_batch = batch.project(projection)?;
791                                    (array, projected_batch)
792                                },
793                                None => (array, batch)
794                            })
795                        }).and_then(|(array, batch)| {
796                            match as_boolean_array(&array) {
797                                Ok(filter_array) => {
798                                    self.metrics.selectivity.add_total(batch.num_rows());
799                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
800                                    let batch = filter_record_batch(&batch, filter_array)?;
801                                    let state = self.batch_coalescer.push_batch(batch)?;
802                                    Ok(state)
803                                }
804                                Err(_) => {
805                                    internal_err!(
806                                        "Cannot create filter_array from non-boolean predicates"
807                                    )
808                                }
809                            }
810                        })?;
811                    timer.done();
812
813                    match status {
814                        PushBatchStatus::Continue => {
815                            // Keep pushing more batches
816                        }
817                        PushBatchStatus::LimitReached => {
818                            // limit was reached, so stop early
819                            self.batch_coalescer.finish()?;
820                            // continue draining the coalescer
821                        }
822                    }
823                }
824
825                // Error case
826                other => return Poll::Ready(other),
827            }
828        }
829    }
830
831    fn size_hint(&self) -> (usize, Option<usize>) {
832        // Same number of record batches
833        self.input.size_hint()
834    }
835}
836impl RecordBatchStream for FilterExecStream {
837    fn schema(&self) -> SchemaRef {
838        Arc::clone(&self.schema)
839    }
840}
841
842/// Return the equals Column-Pairs and Non-equals Column-Pairs
843#[deprecated(
844    since = "51.0.0",
845    note = "This function will be internal in the future"
846)]
847pub fn collect_columns_from_predicate(
848    predicate: &'_ Arc<dyn PhysicalExpr>,
849) -> EqualAndNonEqual<'_> {
850    collect_columns_from_predicate_inner(predicate)
851}
852
853fn collect_columns_from_predicate_inner(
854    predicate: &'_ Arc<dyn PhysicalExpr>,
855) -> EqualAndNonEqual<'_> {
856    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
857    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
858
859    let predicates = split_conjunction(predicate);
860    predicates.into_iter().for_each(|p| {
861        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
862            match binary.op() {
863                Operator::Eq => {
864                    eq_predicate_columns.push((binary.left(), binary.right()))
865                }
866                Operator::NotEq => {
867                    ne_predicate_columns.push((binary.left(), binary.right()))
868                }
869                _ => {}
870            }
871        }
872    });
873
874    (eq_predicate_columns, ne_predicate_columns)
875}
876
877/// Pair of `Arc<dyn PhysicalExpr>`s
878pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
879
880/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
881pub type EqualAndNonEqual<'a> =
882    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887    use crate::empty::EmptyExec;
888    use crate::expressions::*;
889    use crate::test;
890    use crate::test::exec::StatisticsExec;
891    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
892    use datafusion_common::ScalarValue;
893
894    #[tokio::test]
895    async fn collect_columns_predicates() -> Result<()> {
896        let schema = test::aggr_test_schema();
897        let predicate: Arc<dyn PhysicalExpr> = binary(
898            binary(
899                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
900                Operator::And,
901                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
902                &schema,
903            )?,
904            Operator::And,
905            binary(
906                binary(
907                    col("c2", &schema)?,
908                    Operator::Eq,
909                    col("c9", &schema)?,
910                    &schema,
911                )?,
912                Operator::And,
913                binary(
914                    col("c1", &schema)?,
915                    Operator::NotEq,
916                    col("c13", &schema)?,
917                    &schema,
918                )?,
919                &schema,
920            )?,
921            &schema,
922        )?;
923
924        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
925        assert_eq!(2, equal_pairs.len());
926        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
927        assert!(equal_pairs[0].1.eq(&lit(4u32)));
928
929        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
930        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
931
932        assert_eq!(1, ne_pairs.len());
933        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
934        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
935
936        Ok(())
937    }
938
939    #[tokio::test]
940    async fn test_filter_statistics_basic_expr() -> Result<()> {
941        // Table:
942        //      a: min=1, max=100
943        let bytes_per_row = 4;
944        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
945        let input = Arc::new(StatisticsExec::new(
946            Statistics {
947                num_rows: Precision::Inexact(100),
948                total_byte_size: Precision::Inexact(100 * bytes_per_row),
949                column_statistics: vec![ColumnStatistics {
950                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
951                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
952                    ..Default::default()
953                }],
954            },
955            schema.clone(),
956        ));
957
958        // a <= 25
959        let predicate: Arc<dyn PhysicalExpr> =
960            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
961
962        // WHERE a <= 25
963        let filter: Arc<dyn ExecutionPlan> =
964            Arc::new(FilterExec::try_new(predicate, input)?);
965
966        let statistics = filter.partition_statistics(None)?;
967        assert_eq!(statistics.num_rows, Precision::Inexact(25));
968        assert_eq!(
969            statistics.total_byte_size,
970            Precision::Inexact(25 * bytes_per_row)
971        );
972        assert_eq!(
973            statistics.column_statistics,
974            vec![ColumnStatistics {
975                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
976                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
977                ..Default::default()
978            }]
979        );
980
981        Ok(())
982    }
983
984    #[tokio::test]
985    async fn test_filter_statistics_column_level_nested() -> Result<()> {
986        // Table:
987        //      a: min=1, max=100
988        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
989        let input = Arc::new(StatisticsExec::new(
990            Statistics {
991                num_rows: Precision::Inexact(100),
992                column_statistics: vec![ColumnStatistics {
993                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
994                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
995                    ..Default::default()
996                }],
997                total_byte_size: Precision::Absent,
998            },
999            schema.clone(),
1000        ));
1001
1002        // WHERE a <= 25
1003        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1004            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1005            input,
1006        )?);
1007
1008        // Nested filters (two separate physical plans, instead of AND chain in the expr)
1009        // WHERE a >= 10
1010        // WHERE a <= 25
1011        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1012            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1013            sub_filter,
1014        )?);
1015
1016        let statistics = filter.partition_statistics(None)?;
1017        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1018        assert_eq!(
1019            statistics.column_statistics,
1020            vec![ColumnStatistics {
1021                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1022                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1023                ..Default::default()
1024            }]
1025        );
1026
1027        Ok(())
1028    }
1029
1030    #[tokio::test]
1031    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1032        // Table:
1033        //      a: min=1, max=100
1034        //      b: min=1, max=50
1035        let schema = Schema::new(vec![
1036            Field::new("a", DataType::Int32, false),
1037            Field::new("b", DataType::Int32, false),
1038        ]);
1039        let input = Arc::new(StatisticsExec::new(
1040            Statistics {
1041                num_rows: Precision::Inexact(100),
1042                column_statistics: vec![
1043                    ColumnStatistics {
1044                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1045                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1046                        ..Default::default()
1047                    },
1048                    ColumnStatistics {
1049                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1050                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1051                        ..Default::default()
1052                    },
1053                ],
1054                total_byte_size: Precision::Absent,
1055            },
1056            schema.clone(),
1057        ));
1058
1059        // WHERE a <= 25
1060        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1061            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1062            input,
1063        )?);
1064
1065        // WHERE b > 45
1066        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1067            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1068            a_lte_25,
1069        )?);
1070
1071        // WHERE a >= 10
1072        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1073            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1074            b_gt_5,
1075        )?);
1076        let statistics = filter.partition_statistics(None)?;
1077        // On a uniform distribution, only fifteen rows will satisfy the
1078        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1079        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1080        //
1081        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1082        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1083        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1084        assert_eq!(
1085            statistics.column_statistics,
1086            vec![
1087                ColumnStatistics {
1088                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1089                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1090                    ..Default::default()
1091                },
1092                ColumnStatistics {
1093                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1094                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1095                    ..Default::default()
1096                }
1097            ]
1098        );
1099
1100        Ok(())
1101    }
1102
1103    #[tokio::test]
1104    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1105        // Table:
1106        //      a: min=???, max=??? (missing)
1107        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1108        let input = Arc::new(StatisticsExec::new(
1109            Statistics::new_unknown(&schema),
1110            schema.clone(),
1111        ));
1112
1113        // a <= 25
1114        let predicate: Arc<dyn PhysicalExpr> =
1115            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1116
1117        // WHERE a <= 25
1118        let filter: Arc<dyn ExecutionPlan> =
1119            Arc::new(FilterExec::try_new(predicate, input)?);
1120
1121        let statistics = filter.partition_statistics(None)?;
1122        assert_eq!(statistics.num_rows, Precision::Absent);
1123
1124        Ok(())
1125    }
1126
1127    #[tokio::test]
1128    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1129        // Table:
1130        //      a: min=1, max=100
1131        //      b: min=1, max=3
1132        //      c: min=1000.0  max=1100.0
1133        let schema = Schema::new(vec![
1134            Field::new("a", DataType::Int32, false),
1135            Field::new("b", DataType::Int32, false),
1136            Field::new("c", DataType::Float32, false),
1137        ]);
1138        let input = Arc::new(StatisticsExec::new(
1139            Statistics {
1140                num_rows: Precision::Inexact(1000),
1141                total_byte_size: Precision::Inexact(4000),
1142                column_statistics: vec![
1143                    ColumnStatistics {
1144                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1145                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1146                        ..Default::default()
1147                    },
1148                    ColumnStatistics {
1149                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1150                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1151                        ..Default::default()
1152                    },
1153                    ColumnStatistics {
1154                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1155                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1156                        ..Default::default()
1157                    },
1158                ],
1159            },
1160            schema,
1161        ));
1162        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1163        let predicate = Arc::new(BinaryExpr::new(
1164            Arc::new(BinaryExpr::new(
1165                Arc::new(Column::new("a", 0)),
1166                Operator::LtEq,
1167                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1168            )),
1169            Operator::And,
1170            Arc::new(BinaryExpr::new(
1171                Arc::new(BinaryExpr::new(
1172                    Arc::new(Column::new("b", 1)),
1173                    Operator::Eq,
1174                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1175                )),
1176                Operator::And,
1177                Arc::new(BinaryExpr::new(
1178                    Arc::new(BinaryExpr::new(
1179                        Arc::new(Column::new("c", 2)),
1180                        Operator::LtEq,
1181                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1182                    )),
1183                    Operator::And,
1184                    Arc::new(BinaryExpr::new(
1185                        Arc::new(Column::new("a", 0)),
1186                        Operator::Gt,
1187                        Arc::new(Column::new("b", 1)),
1188                    )),
1189                )),
1190            )),
1191        ));
1192        let filter: Arc<dyn ExecutionPlan> =
1193            Arc::new(FilterExec::try_new(predicate, input)?);
1194        let statistics = filter.partition_statistics(None)?;
1195        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1196        // num_rows after ceil => 133.0... => 134
1197        // total_byte_size after ceil => 532.0... => 533
1198        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1199        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1200        let exp_col_stats = vec![
1201            ColumnStatistics {
1202                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1203                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1204                ..Default::default()
1205            },
1206            ColumnStatistics {
1207                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1208                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1209                ..Default::default()
1210            },
1211            ColumnStatistics {
1212                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1213                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1214                ..Default::default()
1215            },
1216        ];
1217        let _ = exp_col_stats
1218            .into_iter()
1219            .zip(statistics.column_statistics)
1220            .map(|(expected, actual)| {
1221                if let Some(val) = actual.min_value.get_value() {
1222                    if val.data_type().is_floating() {
1223                        // Windows rounds arithmetic operation results differently for floating point numbers.
1224                        // Therefore, we check if the actual values are in an epsilon range.
1225                        let actual_min = actual.min_value.get_value().unwrap();
1226                        let actual_max = actual.max_value.get_value().unwrap();
1227                        let expected_min = expected.min_value.get_value().unwrap();
1228                        let expected_max = expected.max_value.get_value().unwrap();
1229                        let eps = ScalarValue::Float32(Some(1e-6));
1230
1231                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1232                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1233
1234                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1235                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1236                    } else {
1237                        assert_eq!(actual, expected);
1238                    }
1239                } else {
1240                    assert_eq!(actual, expected);
1241                }
1242            });
1243
1244        Ok(())
1245    }
1246
1247    #[tokio::test]
1248    async fn test_filter_statistics_full_selective() -> Result<()> {
1249        // Table:
1250        //      a: min=1, max=100
1251        //      b: min=1, max=3
1252        let schema = Schema::new(vec![
1253            Field::new("a", DataType::Int32, false),
1254            Field::new("b", DataType::Int32, false),
1255        ]);
1256        let input = Arc::new(StatisticsExec::new(
1257            Statistics {
1258                num_rows: Precision::Inexact(1000),
1259                total_byte_size: Precision::Inexact(4000),
1260                column_statistics: vec![
1261                    ColumnStatistics {
1262                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1263                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1264                        ..Default::default()
1265                    },
1266                    ColumnStatistics {
1267                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1268                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1269                        ..Default::default()
1270                    },
1271                ],
1272            },
1273            schema,
1274        ));
1275        // WHERE a<200 AND 1<=b
1276        let predicate = Arc::new(BinaryExpr::new(
1277            Arc::new(BinaryExpr::new(
1278                Arc::new(Column::new("a", 0)),
1279                Operator::Lt,
1280                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1281            )),
1282            Operator::And,
1283            Arc::new(BinaryExpr::new(
1284                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1285                Operator::LtEq,
1286                Arc::new(Column::new("b", 1)),
1287            )),
1288        ));
1289        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1290        let expected = input.partition_statistics(None)?.column_statistics;
1291        let filter: Arc<dyn ExecutionPlan> =
1292            Arc::new(FilterExec::try_new(predicate, input)?);
1293        let statistics = filter.partition_statistics(None)?;
1294
1295        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1296        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1297        assert_eq!(statistics.column_statistics, expected);
1298
1299        Ok(())
1300    }
1301
1302    #[tokio::test]
1303    async fn test_filter_statistics_zero_selective() -> Result<()> {
1304        // Table:
1305        //      a: min=1, max=100
1306        //      b: min=1, max=3
1307        let schema = Schema::new(vec![
1308            Field::new("a", DataType::Int32, false),
1309            Field::new("b", DataType::Int32, false),
1310        ]);
1311        let input = Arc::new(StatisticsExec::new(
1312            Statistics {
1313                num_rows: Precision::Inexact(1000),
1314                total_byte_size: Precision::Inexact(4000),
1315                column_statistics: vec![
1316                    ColumnStatistics {
1317                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1318                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1319                        ..Default::default()
1320                    },
1321                    ColumnStatistics {
1322                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1323                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1324                        ..Default::default()
1325                    },
1326                ],
1327            },
1328            schema,
1329        ));
1330        // WHERE a>200 AND 1<=b
1331        let predicate = Arc::new(BinaryExpr::new(
1332            Arc::new(BinaryExpr::new(
1333                Arc::new(Column::new("a", 0)),
1334                Operator::Gt,
1335                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1336            )),
1337            Operator::And,
1338            Arc::new(BinaryExpr::new(
1339                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1340                Operator::LtEq,
1341                Arc::new(Column::new("b", 1)),
1342            )),
1343        ));
1344        let filter: Arc<dyn ExecutionPlan> =
1345            Arc::new(FilterExec::try_new(predicate, input)?);
1346        let statistics = filter.partition_statistics(None)?;
1347
1348        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1349        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1350        assert_eq!(
1351            statistics.column_statistics,
1352            vec![
1353                ColumnStatistics {
1354                    min_value: Precision::Exact(ScalarValue::Null),
1355                    max_value: Precision::Exact(ScalarValue::Null),
1356                    sum_value: Precision::Exact(ScalarValue::Null),
1357                    distinct_count: Precision::Exact(0),
1358                    null_count: Precision::Exact(0),
1359                    byte_size: Precision::Absent,
1360                },
1361                ColumnStatistics {
1362                    min_value: Precision::Exact(ScalarValue::Null),
1363                    max_value: Precision::Exact(ScalarValue::Null),
1364                    sum_value: Precision::Exact(ScalarValue::Null),
1365                    distinct_count: Precision::Exact(0),
1366                    null_count: Precision::Exact(0),
1367                    byte_size: Precision::Absent,
1368                },
1369            ]
1370        );
1371
1372        Ok(())
1373    }
1374
1375    #[tokio::test]
1376    async fn test_filter_statistics_more_inputs() -> Result<()> {
1377        let schema = Schema::new(vec![
1378            Field::new("a", DataType::Int32, false),
1379            Field::new("b", DataType::Int32, false),
1380        ]);
1381        let input = Arc::new(StatisticsExec::new(
1382            Statistics {
1383                num_rows: Precision::Inexact(1000),
1384                total_byte_size: Precision::Inexact(4000),
1385                column_statistics: vec![
1386                    ColumnStatistics {
1387                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1388                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1389                        ..Default::default()
1390                    },
1391                    ColumnStatistics {
1392                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1393                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1394                        ..Default::default()
1395                    },
1396                ],
1397            },
1398            schema,
1399        ));
1400        // WHERE a<50
1401        let predicate = Arc::new(BinaryExpr::new(
1402            Arc::new(Column::new("a", 0)),
1403            Operator::Lt,
1404            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1405        ));
1406        let filter: Arc<dyn ExecutionPlan> =
1407            Arc::new(FilterExec::try_new(predicate, input)?);
1408        let statistics = filter.partition_statistics(None)?;
1409
1410        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1411        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1412        assert_eq!(
1413            statistics.column_statistics,
1414            vec![
1415                ColumnStatistics {
1416                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1417                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1418                    ..Default::default()
1419                },
1420                ColumnStatistics {
1421                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1422                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1423                    ..Default::default()
1424                },
1425            ]
1426        );
1427
1428        Ok(())
1429    }
1430
1431    #[tokio::test]
1432    async fn test_empty_input_statistics() -> Result<()> {
1433        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1434        let input = Arc::new(StatisticsExec::new(
1435            Statistics::new_unknown(&schema),
1436            schema,
1437        ));
1438        // WHERE a <= 10 AND 0 <= a - 5
1439        let predicate = Arc::new(BinaryExpr::new(
1440            Arc::new(BinaryExpr::new(
1441                Arc::new(Column::new("a", 0)),
1442                Operator::LtEq,
1443                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1444            )),
1445            Operator::And,
1446            Arc::new(BinaryExpr::new(
1447                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1448                Operator::LtEq,
1449                Arc::new(BinaryExpr::new(
1450                    Arc::new(Column::new("a", 0)),
1451                    Operator::Minus,
1452                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1453                )),
1454            )),
1455        ));
1456        let filter: Arc<dyn ExecutionPlan> =
1457            Arc::new(FilterExec::try_new(predicate, input)?);
1458        let filter_statistics = filter.partition_statistics(None)?;
1459
1460        let expected_filter_statistics = Statistics {
1461            num_rows: Precision::Absent,
1462            total_byte_size: Precision::Absent,
1463            column_statistics: vec![ColumnStatistics {
1464                null_count: Precision::Absent,
1465                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1466                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1467                sum_value: Precision::Absent,
1468                distinct_count: Precision::Absent,
1469                byte_size: Precision::Absent,
1470            }],
1471        };
1472
1473        assert_eq!(filter_statistics, expected_filter_statistics);
1474
1475        Ok(())
1476    }
1477
1478    #[tokio::test]
1479    async fn test_statistics_with_constant_column() -> Result<()> {
1480        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1481        let input = Arc::new(StatisticsExec::new(
1482            Statistics::new_unknown(&schema),
1483            schema,
1484        ));
1485        // WHERE a = 10
1486        let predicate = Arc::new(BinaryExpr::new(
1487            Arc::new(Column::new("a", 0)),
1488            Operator::Eq,
1489            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1490        ));
1491        let filter: Arc<dyn ExecutionPlan> =
1492            Arc::new(FilterExec::try_new(predicate, input)?);
1493        let filter_statistics = filter.partition_statistics(None)?;
1494        // First column is "a", and it is a column with only one value after the filter.
1495        assert!(filter_statistics.column_statistics[0].is_singleton());
1496
1497        Ok(())
1498    }
1499
1500    #[tokio::test]
1501    async fn test_validation_filter_selectivity() -> Result<()> {
1502        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1503        let input = Arc::new(StatisticsExec::new(
1504            Statistics::new_unknown(&schema),
1505            schema,
1506        ));
1507        // WHERE a = 10
1508        let predicate = Arc::new(BinaryExpr::new(
1509            Arc::new(Column::new("a", 0)),
1510            Operator::Eq,
1511            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1512        ));
1513        let filter = FilterExec::try_new(predicate, input)?;
1514        assert!(filter.with_default_selectivity(120).is_err());
1515        Ok(())
1516    }
1517
1518    #[tokio::test]
1519    async fn test_custom_filter_selectivity() -> Result<()> {
1520        // Need a decimal to trigger inexact selectivity
1521        let schema =
1522            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1523        let input = Arc::new(StatisticsExec::new(
1524            Statistics {
1525                num_rows: Precision::Inexact(1000),
1526                total_byte_size: Precision::Inexact(4000),
1527                column_statistics: vec![ColumnStatistics {
1528                    ..Default::default()
1529                }],
1530            },
1531            schema,
1532        ));
1533        // WHERE a = 10
1534        let predicate = Arc::new(BinaryExpr::new(
1535            Arc::new(Column::new("a", 0)),
1536            Operator::Eq,
1537            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1538        ));
1539        let filter = FilterExec::try_new(predicate, input)?;
1540        let statistics = filter.partition_statistics(None)?;
1541        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1542        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1543        let filter = filter.with_default_selectivity(40)?;
1544        let statistics = filter.partition_statistics(None)?;
1545        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1546        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1547        Ok(())
1548    }
1549
1550    #[test]
1551    fn test_equivalence_properties_union_type() -> Result<()> {
1552        let union_type = DataType::Union(
1553            UnionFields::new(
1554                vec![0, 1],
1555                vec![
1556                    Field::new("f1", DataType::Int32, true),
1557                    Field::new("f2", DataType::Utf8, true),
1558                ],
1559            ),
1560            UnionMode::Sparse,
1561        );
1562
1563        let schema = Arc::new(Schema::new(vec![
1564            Field::new("c1", DataType::Int32, true),
1565            Field::new("c2", union_type, true),
1566        ]));
1567
1568        let exec = FilterExec::try_new(
1569            binary(
1570                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1571                Operator::And,
1572                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1573                &schema,
1574            )?,
1575            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1576        )?;
1577
1578        exec.partition_statistics(None).unwrap();
1579
1580        Ok(())
1581    }
1582
1583    #[test]
1584    fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
1585        // Test that FilterExec with a projection must remap parent dynamic
1586        // filter column indices from its output schema to the input schema
1587        // before passing them to the child.
1588        let input_schema = Arc::new(Schema::new(vec![
1589            Field::new("a", DataType::Int32, false),
1590            Field::new("b", DataType::Utf8, false),
1591            Field::new("c", DataType::Float64, false),
1592        ]));
1593        let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
1594
1595        // FilterExec: a > 0, projection=[c@2]
1596        let predicate = Arc::new(BinaryExpr::new(
1597            Arc::new(Column::new("a", 0)),
1598            Operator::Gt,
1599            Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1600        ));
1601        let filter =
1602            FilterExec::try_new(predicate, input)?.with_projection(Some(vec![2]))?;
1603
1604        // Output schema should be [c:Float64]
1605        let output_schema = filter.schema();
1606        assert_eq!(output_schema.fields().len(), 1);
1607        assert_eq!(output_schema.field(0).name(), "c");
1608
1609        // Simulate a parent dynamic filter referencing output column c@0
1610        let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
1611
1612        let config = ConfigOptions::new();
1613        let desc = filter.gather_filters_for_pushdown(
1614            FilterPushdownPhase::Post,
1615            vec![parent_filter],
1616            &config,
1617        )?;
1618
1619        // The filter pushed to the child must reference c@2 (input schema),
1620        // not c@0 (output schema).
1621        let parent_filters = desc.parent_filters();
1622        assert_eq!(parent_filters.len(), 1); // one child
1623        assert_eq!(parent_filters[0].len(), 1); // one filter
1624        let remapped = &parent_filters[0][0].predicate;
1625        let display = format!("{remapped}");
1626        assert_eq!(
1627            display, "c@2",
1628            "Post-phase parent filter column index must be remapped \
1629             from output schema (c@0) to input schema (c@2)"
1630        );
1631
1632        Ok(())
1633    }
1634}