Skip to main content

datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use itertools::Itertools;
24
25use super::{
26    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27    RecordBatchStream, SendableRecordBatchStream, Statistics,
28};
29use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
30use crate::common::can_project;
31use crate::execution_plan::CardinalityEffect;
32use crate::filter_pushdown::{
33    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34    FilterPushdownPropagation, PushedDown,
35};
36use crate::metrics::{MetricBuilder, MetricType};
37use crate::projection::{
38    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
39    try_embed_projection, update_expr,
40};
41use crate::{
42    DisplayFormatType, ExecutionPlan,
43    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
44};
45
46use arrow::compute::filter_record_batch;
47use arrow::datatypes::{DataType, SchemaRef};
48use arrow::record_batch::RecordBatch;
49use datafusion_common::cast::as_boolean_array;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::stats::Precision;
52use datafusion_common::{
53    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
54};
55use datafusion_execution::TaskContext;
56use datafusion_expr::Operator;
57use datafusion_physical_expr::equivalence::ProjectionMapping;
58use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
59use datafusion_physical_expr::intervals::utils::check_support;
60use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
61use datafusion_physical_expr::{
62    AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
63    PhysicalExpr, analyze, conjunction, split_conjunction,
64};
65
66use datafusion_physical_expr_common::physical_expr::fmt_sql;
67use futures::stream::{Stream, StreamExt};
68use log::trace;
69
70const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
71const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
72
73/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
74/// include in its output batches.
75#[derive(Debug, Clone)]
76pub struct FilterExec {
77    /// The expression to filter on. This expression must evaluate to a boolean value.
78    predicate: Arc<dyn PhysicalExpr>,
79    /// The input plan
80    input: Arc<dyn ExecutionPlan>,
81    /// Execution metrics
82    metrics: ExecutionPlanMetricsSet,
83    /// Selectivity for statistics. 0 = no rows, 100 = all rows
84    default_selectivity: u8,
85    /// Properties equivalence properties, partitioning, etc.
86    cache: PlanProperties,
87    /// The projection indices of the columns in the output schema of join
88    projection: Option<Vec<usize>>,
89    /// Target batch size for output batches
90    batch_size: usize,
91    /// Number of rows to fetch
92    fetch: Option<usize>,
93}
94
95impl FilterExec {
96    /// Create a FilterExec on an input
97    #[expect(clippy::needless_pass_by_value)]
98    pub fn try_new(
99        predicate: Arc<dyn PhysicalExpr>,
100        input: Arc<dyn ExecutionPlan>,
101    ) -> Result<Self> {
102        match predicate.data_type(input.schema().as_ref())? {
103            DataType::Boolean => {
104                let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
105                let cache = Self::compute_properties(
106                    &input,
107                    &predicate,
108                    default_selectivity,
109                    None,
110                )?;
111                Ok(Self {
112                    predicate,
113                    input: Arc::clone(&input),
114                    metrics: ExecutionPlanMetricsSet::new(),
115                    default_selectivity,
116                    cache,
117                    projection: None,
118                    batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119                    fetch: None,
120                })
121            }
122            other => {
123                plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
124            }
125        }
126    }
127
128    pub fn with_default_selectivity(
129        mut self,
130        default_selectivity: u8,
131    ) -> Result<Self, DataFusionError> {
132        if default_selectivity > 100 {
133            return plan_err!(
134                "Default filter selectivity value needs to be less than or equal to 100"
135            );
136        }
137        self.default_selectivity = default_selectivity;
138        Ok(self)
139    }
140
141    /// Return new instance of [FilterExec] with the given projection.
142    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
143        //  Check if the projection is valid
144        can_project(&self.schema(), projection.as_ref())?;
145
146        let projection = match projection {
147            Some(projection) => match &self.projection {
148                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
149                None => Some(projection),
150            },
151            None => None,
152        };
153
154        let cache = Self::compute_properties(
155            &self.input,
156            &self.predicate,
157            self.default_selectivity,
158            projection.as_ref(),
159        )?;
160        Ok(Self {
161            predicate: Arc::clone(&self.predicate),
162            input: Arc::clone(&self.input),
163            metrics: self.metrics.clone(),
164            default_selectivity: self.default_selectivity,
165            cache,
166            projection,
167            batch_size: self.batch_size,
168            fetch: self.fetch,
169        })
170    }
171
172    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
173        Ok(Self {
174            predicate: Arc::clone(&self.predicate),
175            input: Arc::clone(&self.input),
176            metrics: self.metrics.clone(),
177            default_selectivity: self.default_selectivity,
178            cache: self.cache.clone(),
179            projection: self.projection.clone(),
180            batch_size,
181            fetch: self.fetch,
182        })
183    }
184
185    /// The expression to filter on. This expression must evaluate to a boolean value.
186    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
187        &self.predicate
188    }
189
190    /// The input plan
191    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
192        &self.input
193    }
194
195    /// The default selectivity
196    pub fn default_selectivity(&self) -> u8 {
197        self.default_selectivity
198    }
199
200    /// Projection
201    pub fn projection(&self) -> Option<&Vec<usize>> {
202        self.projection.as_ref()
203    }
204
205    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
206    fn statistics_helper(
207        schema: &SchemaRef,
208        input_stats: Statistics,
209        predicate: &Arc<dyn PhysicalExpr>,
210        default_selectivity: u8,
211    ) -> Result<Statistics> {
212        if !check_support(predicate, schema) {
213            let selectivity = default_selectivity as f64 / 100.0;
214            let mut stats = input_stats.to_inexact();
215            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
216            stats.total_byte_size = stats
217                .total_byte_size
218                .with_estimated_selectivity(selectivity);
219            return Ok(stats);
220        }
221
222        let num_rows = input_stats.num_rows;
223        let total_byte_size = input_stats.total_byte_size;
224        let input_analysis_ctx =
225            AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
226
227        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
228
229        // Estimate (inexact) selectivity of predicate
230        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
231        let num_rows = num_rows.with_estimated_selectivity(selectivity);
232        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233
234        let column_statistics = collect_new_statistics(
235            schema,
236            &input_stats.column_statistics,
237            analysis_ctx.boundaries,
238        );
239        Ok(Statistics {
240            num_rows,
241            total_byte_size,
242            column_statistics,
243        })
244    }
245
246    /// Returns the `AcrossPartitions` value for `expr` if it is constant:
247    /// either already known constant in `input_eqs`, or a `Literal`
248    /// (which is inherently constant across all partitions).
249    fn expr_constant_or_literal(
250        expr: &Arc<dyn PhysicalExpr>,
251        input_eqs: &EquivalenceProperties,
252    ) -> Option<AcrossPartitions> {
253        input_eqs.is_expr_constant(expr).or_else(|| {
254            expr.as_any()
255                .downcast_ref::<Literal>()
256                .map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
257        })
258    }
259
260    fn extend_constants(
261        input: &Arc<dyn ExecutionPlan>,
262        predicate: &Arc<dyn PhysicalExpr>,
263    ) -> Vec<ConstExpr> {
264        let mut res_constants = Vec::new();
265        let input_eqs = input.equivalence_properties();
266
267        let conjunctions = split_conjunction(predicate);
268        for conjunction in conjunctions {
269            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
270                && binary.op() == &Operator::Eq
271            {
272                // Check if either side is constant — either already known
273                // constant from the input equivalence properties, or a literal
274                // value (which is inherently constant across all partitions).
275                let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
276                let right_const =
277                    Self::expr_constant_or_literal(binary.right(), input_eqs);
278
279                if let Some(left_across) = left_const {
280                    // LEFT is constant, so RIGHT must also be constant.
281                    // Use RIGHT's known across value if available, otherwise
282                    // propagate LEFT's (e.g. Uniform from a literal).
283                    let across = right_const.unwrap_or(left_across);
284                    res_constants
285                        .push(ConstExpr::new(Arc::clone(binary.right()), across));
286                } else if let Some(right_across) = right_const {
287                    // RIGHT is constant, so LEFT must also be constant.
288                    res_constants
289                        .push(ConstExpr::new(Arc::clone(binary.left()), right_across));
290                }
291            }
292        }
293        res_constants
294    }
295    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
296    fn compute_properties(
297        input: &Arc<dyn ExecutionPlan>,
298        predicate: &Arc<dyn PhysicalExpr>,
299        default_selectivity: u8,
300        projection: Option<&Vec<usize>>,
301    ) -> Result<PlanProperties> {
302        // Combine the equal predicates with the input equivalence properties
303        // to construct the equivalence properties:
304        let schema = input.schema();
305        let stats = Self::statistics_helper(
306            &schema,
307            input.partition_statistics(None)?,
308            predicate,
309            default_selectivity,
310        )?;
311        let mut eq_properties = input.equivalence_properties().clone();
312        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
313        for (lhs, rhs) in equal_pairs {
314            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
315        }
316        // Add the columns that have only one viable value (singleton) after
317        // filtering to constants.
318        let constants = collect_columns(predicate)
319            .into_iter()
320            .filter(|column| stats.column_statistics[column.index()].is_singleton())
321            .map(|column| {
322                let value = stats.column_statistics[column.index()]
323                    .min_value
324                    .get_value();
325                let expr = Arc::new(column) as _;
326                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
327            });
328        // This is for statistics
329        eq_properties.add_constants(constants)?;
330        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
331        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
332        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
333
334        let mut output_partitioning = input.output_partitioning().clone();
335        // If contains projection, update the PlanProperties.
336        if let Some(projection) = projection {
337            let schema = eq_properties.schema();
338            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
339            let out_schema = project_schema(schema, Some(projection))?;
340            output_partitioning =
341                output_partitioning.project(&projection_mapping, &eq_properties);
342            eq_properties = eq_properties.project(&projection_mapping, out_schema);
343        }
344
345        Ok(PlanProperties::new(
346            eq_properties,
347            output_partitioning,
348            input.pipeline_behavior(),
349            input.boundedness(),
350        ))
351    }
352}
353
354impl DisplayAs for FilterExec {
355    fn fmt_as(
356        &self,
357        t: DisplayFormatType,
358        f: &mut std::fmt::Formatter,
359    ) -> std::fmt::Result {
360        match t {
361            DisplayFormatType::Default | DisplayFormatType::Verbose => {
362                let display_projections = if let Some(projection) =
363                    self.projection.as_ref()
364                {
365                    format!(
366                        ", projection=[{}]",
367                        projection
368                            .iter()
369                            .map(|index| format!(
370                                "{}@{}",
371                                self.input.schema().fields().get(*index).unwrap().name(),
372                                index
373                            ))
374                            .collect::<Vec<_>>()
375                            .join(", ")
376                    )
377                } else {
378                    "".to_string()
379                };
380                let fetch = self
381                    .fetch
382                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
383                write!(
384                    f,
385                    "FilterExec: {}{}{}",
386                    self.predicate, display_projections, fetch
387                )
388            }
389            DisplayFormatType::TreeRender => {
390                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
391            }
392        }
393    }
394}
395
396impl ExecutionPlan for FilterExec {
397    fn name(&self) -> &'static str {
398        "FilterExec"
399    }
400
401    /// Return a reference to Any that can be used for downcasting
402    fn as_any(&self) -> &dyn Any {
403        self
404    }
405
406    fn properties(&self) -> &PlanProperties {
407        &self.cache
408    }
409
410    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
411        vec![&self.input]
412    }
413
414    fn maintains_input_order(&self) -> Vec<bool> {
415        // Tell optimizer this operator doesn't reorder its input
416        vec![true]
417    }
418
419    fn with_new_children(
420        self: Arc<Self>,
421        mut children: Vec<Arc<dyn ExecutionPlan>>,
422    ) -> Result<Arc<dyn ExecutionPlan>> {
423        FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
424            .and_then(|e| {
425                let selectivity = e.default_selectivity();
426                e.with_default_selectivity(selectivity)
427            })
428            .and_then(|e| e.with_projection(self.projection().cloned()))
429            .map(|e| e.with_fetch(self.fetch).unwrap())
430    }
431
432    fn execute(
433        &self,
434        partition: usize,
435        context: Arc<TaskContext>,
436    ) -> Result<SendableRecordBatchStream> {
437        trace!(
438            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
439            partition,
440            context.session_id(),
441            context.task_id()
442        );
443        let metrics = FilterExecMetrics::new(&self.metrics, partition);
444        Ok(Box::pin(FilterExecStream {
445            schema: self.schema(),
446            predicate: Arc::clone(&self.predicate),
447            input: self.input.execute(partition, context)?,
448            metrics,
449            projection: self.projection.clone(),
450            batch_coalescer: LimitedBatchCoalescer::new(
451                self.schema(),
452                self.batch_size,
453                self.fetch,
454            ),
455        }))
456    }
457
458    fn metrics(&self) -> Option<MetricsSet> {
459        Some(self.metrics.clone_inner())
460    }
461
462    /// The output statistics of a filtering operation can be estimated if the
463    /// predicate's selectivity value can be determined for the incoming data.
464    fn statistics(&self) -> Result<Statistics> {
465        self.partition_statistics(None)
466    }
467
468    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
469        let input_stats = self.input.partition_statistics(partition)?;
470        let schema = self.schema();
471        let stats = Self::statistics_helper(
472            &schema,
473            input_stats,
474            self.predicate(),
475            self.default_selectivity,
476        )?;
477        Ok(stats.project(self.projection.as_ref()))
478    }
479
480    fn cardinality_effect(&self) -> CardinalityEffect {
481        CardinalityEffect::LowerEqual
482    }
483
484    /// Tries to swap `projection` with its input (`filter`). If possible, performs
485    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
486    fn try_swapping_with_projection(
487        &self,
488        projection: &ProjectionExec,
489    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
490        // If the projection does not narrow the schema, we should not try to push it down:
491        if projection.expr().len() < projection.input().schema().fields().len() {
492            // Each column in the predicate expression must exist after the projection.
493            if let Some(new_predicate) =
494                update_expr(self.predicate(), projection.expr(), false)?
495            {
496                return FilterExec::try_new(
497                    new_predicate,
498                    make_with_child(projection, self.input())?,
499                )
500                .and_then(|e| {
501                    let selectivity = self.default_selectivity();
502                    e.with_default_selectivity(selectivity)
503                })
504                .map(|e| Some(Arc::new(e) as _));
505            }
506        }
507        try_embed_projection(projection, self)
508    }
509
510    fn gather_filters_for_pushdown(
511        &self,
512        phase: FilterPushdownPhase,
513        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514        _config: &ConfigOptions,
515    ) -> Result<FilterDescription> {
516        if !matches!(phase, FilterPushdownPhase::Pre) {
517            // For non-pre phase, filters pass through unchanged
518            let child =
519                ChildFilterDescription::from_child(&parent_filters, self.input())?;
520            return Ok(FilterDescription::new().with_child(child));
521        }
522
523        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
524            .with_self_filters(
525                split_conjunction(&self.predicate)
526                    .into_iter()
527                    .cloned()
528                    .collect(),
529            );
530
531        Ok(FilterDescription::new().with_child(child))
532    }
533
534    fn handle_child_pushdown_result(
535        &self,
536        phase: FilterPushdownPhase,
537        child_pushdown_result: ChildPushdownResult,
538        _config: &ConfigOptions,
539    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
540        if !matches!(phase, FilterPushdownPhase::Pre) {
541            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
542        }
543        // We absorb any parent filters that were not handled by our children
544        let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
545            child_pushdown_result
546                .parent_filters
547                .iter()
548                .filter_map(|f| {
549                    matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
550                })
551                .collect();
552
553        // If this FilterExec has a projection, the unsupported parent filters
554        // are in the output schema (after projection) coordinates. We need to
555        // remap them to the input schema coordinates before combining with self filters.
556        if self.projection.is_some() {
557            let input_schema = self.input().schema();
558            unsupported_parent_filters = unsupported_parent_filters
559                .into_iter()
560                .map(|expr| reassign_expr_columns(expr, &input_schema))
561                .collect::<Result<Vec<_>>>()?;
562        }
563
564        let unsupported_self_filters = child_pushdown_result
565            .self_filters
566            .first()
567            .expect("we have exactly one child")
568            .iter()
569            .filter_map(|f| match f.discriminant {
570                PushedDown::Yes => None,
571                PushedDown::No => Some(&f.predicate),
572            })
573            .cloned();
574
575        let unhandled_filters = unsupported_parent_filters
576            .into_iter()
577            .chain(unsupported_self_filters)
578            .collect_vec();
579
580        // If we have unhandled filters, we need to create a new FilterExec
581        let filter_input = Arc::clone(self.input());
582        let new_predicate = conjunction(unhandled_filters);
583        let updated_node = if new_predicate.eq(&lit(true)) {
584            // FilterExec is no longer needed, but we may need to leave a projection in place
585            match self.projection() {
586                Some(projection_indices) => {
587                    let filter_child_schema = filter_input.schema();
588                    let proj_exprs = projection_indices
589                        .iter()
590                        .map(|p| {
591                            let field = filter_child_schema.field(*p).clone();
592                            ProjectionExpr {
593                                expr: Arc::new(Column::new(field.name(), *p))
594                                    as Arc<dyn PhysicalExpr>,
595                                alias: field.name().to_string(),
596                            }
597                        })
598                        .collect::<Vec<_>>();
599                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
600                        as Arc<dyn ExecutionPlan>)
601                }
602                None => {
603                    // No projection needed, just return the input
604                    Some(filter_input)
605                }
606            }
607        } else if new_predicate.eq(&self.predicate) {
608            // The new predicate is the same as our current predicate
609            None
610        } else {
611            // Create a new FilterExec with the new predicate, preserving the projection
612            let new = FilterExec {
613                predicate: Arc::clone(&new_predicate),
614                input: Arc::clone(&filter_input),
615                metrics: self.metrics.clone(),
616                default_selectivity: self.default_selectivity,
617                cache: Self::compute_properties(
618                    &filter_input,
619                    &new_predicate,
620                    self.default_selectivity,
621                    self.projection.as_ref(),
622                )?,
623                projection: self.projection.clone(),
624                batch_size: self.batch_size,
625                fetch: self.fetch,
626            };
627            Some(Arc::new(new) as _)
628        };
629
630        Ok(FilterPushdownPropagation {
631            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
632            updated_node,
633        })
634    }
635
636    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
637        Some(Arc::new(Self {
638            predicate: Arc::clone(&self.predicate),
639            input: Arc::clone(&self.input),
640            metrics: self.metrics.clone(),
641            default_selectivity: self.default_selectivity,
642            cache: self.cache.clone(),
643            projection: self.projection.clone(),
644            batch_size: self.batch_size,
645            fetch,
646        }))
647    }
648}
649
650impl EmbeddedProjection for FilterExec {
651    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
652        self.with_projection(projection)
653    }
654}
655
656/// This function ensures that all bounds in the `ExprBoundaries` vector are
657/// converted to closed bounds. If a lower/upper bound is initially open, it
658/// is adjusted by using the next/previous value for its data type to convert
659/// it into a closed bound.
660fn collect_new_statistics(
661    schema: &SchemaRef,
662    input_column_stats: &[ColumnStatistics],
663    analysis_boundaries: Vec<ExprBoundaries>,
664) -> Vec<ColumnStatistics> {
665    analysis_boundaries
666        .into_iter()
667        .enumerate()
668        .map(
669            |(
670                idx,
671                ExprBoundaries {
672                    interval,
673                    distinct_count,
674                    ..
675                },
676            )| {
677                let Some(interval) = interval else {
678                    // If the interval is `None`, we can say that there are no rows.
679                    // Use a typed null to preserve the column's data type, so that
680                    // downstream interval analysis can still intersect intervals
681                    // of the same type.
682                    let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
683                        .unwrap_or(ScalarValue::Null);
684                    return ColumnStatistics {
685                        null_count: Precision::Exact(0),
686                        max_value: Precision::Exact(typed_null.clone()),
687                        min_value: Precision::Exact(typed_null.clone()),
688                        sum_value: Precision::Exact(typed_null),
689                        distinct_count: Precision::Exact(0),
690                        byte_size: input_column_stats[idx].byte_size,
691                    };
692                };
693                let (lower, upper) = interval.into_bounds();
694                let (min_value, max_value) = if lower.eq(&upper) {
695                    (Precision::Exact(lower), Precision::Exact(upper))
696                } else {
697                    (Precision::Inexact(lower), Precision::Inexact(upper))
698                };
699                ColumnStatistics {
700                    null_count: input_column_stats[idx].null_count.to_inexact(),
701                    max_value,
702                    min_value,
703                    sum_value: Precision::Absent,
704                    distinct_count: distinct_count.to_inexact(),
705                    byte_size: input_column_stats[idx].byte_size,
706                }
707            },
708        )
709        .collect()
710}
711
712/// The FilterExec streams wraps the input iterator and applies the predicate expression to
713/// determine which rows to include in its output batches
714struct FilterExecStream {
715    /// Output schema after the projection
716    schema: SchemaRef,
717    /// The expression to filter on. This expression must evaluate to a boolean value.
718    predicate: Arc<dyn PhysicalExpr>,
719    /// The input partition to filter.
720    input: SendableRecordBatchStream,
721    /// Runtime metrics recording
722    metrics: FilterExecMetrics,
723    /// The projection indices of the columns in the input schema
724    projection: Option<Vec<usize>>,
725    /// Batch coalescer to combine small batches
726    batch_coalescer: LimitedBatchCoalescer,
727}
728
729/// The metrics for `FilterExec`
730struct FilterExecMetrics {
731    /// Common metrics for most operators
732    baseline_metrics: BaselineMetrics,
733    /// Selectivity of the filter, calculated as output_rows / input_rows
734    selectivity: RatioMetrics,
735    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
736    // or modifying metrics comments
737}
738
739impl FilterExecMetrics {
740    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
741        Self {
742            baseline_metrics: BaselineMetrics::new(metrics, partition),
743            selectivity: MetricBuilder::new(metrics)
744                .with_type(MetricType::SUMMARY)
745                .ratio_metrics("selectivity", partition),
746        }
747    }
748}
749
750pub fn batch_filter(
751    batch: &RecordBatch,
752    predicate: &Arc<dyn PhysicalExpr>,
753) -> Result<RecordBatch> {
754    filter_and_project(batch, predicate, None)
755}
756
757fn filter_and_project(
758    batch: &RecordBatch,
759    predicate: &Arc<dyn PhysicalExpr>,
760    projection: Option<&Vec<usize>>,
761) -> Result<RecordBatch> {
762    predicate
763        .evaluate(batch)
764        .and_then(|v| v.into_array(batch.num_rows()))
765        .and_then(|array| {
766            Ok(match (as_boolean_array(&array), projection) {
767                // Apply filter array to record batch
768                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
769                (Ok(filter_array), Some(projection)) => {
770                    let projected_batch = batch.project(projection)?;
771                    filter_record_batch(&projected_batch, filter_array)?
772                }
773                (Err(_), _) => {
774                    return internal_err!(
775                        "Cannot create filter_array from non-boolean predicates"
776                    );
777                }
778            })
779        })
780}
781
782impl Stream for FilterExecStream {
783    type Item = Result<RecordBatch>;
784
785    fn poll_next(
786        mut self: Pin<&mut Self>,
787        cx: &mut Context<'_>,
788    ) -> Poll<Option<Self::Item>> {
789        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
790        loop {
791            // If there is a completed batch ready, return it
792            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
793                self.metrics.selectivity.add_part(batch.num_rows());
794                let poll = Poll::Ready(Some(Ok(batch)));
795                return self.metrics.baseline_metrics.record_poll(poll);
796            }
797
798            if self.batch_coalescer.is_finished() {
799                // If input is done and no batches are ready, return None to signal end of stream.
800                return Poll::Ready(None);
801            }
802
803            // Attempt to pull the next batch from the input stream.
804            match ready!(self.input.poll_next_unpin(cx)) {
805                None => {
806                    self.batch_coalescer.finish()?;
807                    // continue draining the coalescer
808                }
809                Some(Ok(batch)) => {
810                    let timer = elapsed_compute.timer();
811                    let status = self.predicate.as_ref()
812                        .evaluate(&batch)
813                        .and_then(|v| v.into_array(batch.num_rows()))
814                        .and_then(|array| {
815                            Ok(match self.projection {
816                                Some(ref projection) => {
817                                    let projected_batch = batch.project(projection)?;
818                                    (array, projected_batch)
819                                },
820                                None => (array, batch)
821                            })
822                        }).and_then(|(array, batch)| {
823                            match as_boolean_array(&array) {
824                                Ok(filter_array) => {
825                                    self.metrics.selectivity.add_total(batch.num_rows());
826                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
827                                    let batch = filter_record_batch(&batch, filter_array)?;
828                                    let state = self.batch_coalescer.push_batch(batch)?;
829                                    Ok(state)
830                                }
831                                Err(_) => {
832                                    internal_err!(
833                                        "Cannot create filter_array from non-boolean predicates"
834                                    )
835                                }
836                            }
837                        })?;
838                    timer.done();
839
840                    match status {
841                        PushBatchStatus::Continue => {
842                            // Keep pushing more batches
843                        }
844                        PushBatchStatus::LimitReached => {
845                            // limit was reached, so stop early
846                            self.batch_coalescer.finish()?;
847                            // continue draining the coalescer
848                        }
849                    }
850                }
851
852                // Error case
853                other => return Poll::Ready(other),
854            }
855        }
856    }
857
858    fn size_hint(&self) -> (usize, Option<usize>) {
859        // Same number of record batches
860        self.input.size_hint()
861    }
862}
863impl RecordBatchStream for FilterExecStream {
864    fn schema(&self) -> SchemaRef {
865        Arc::clone(&self.schema)
866    }
867}
868
869/// Return the equals Column-Pairs and Non-equals Column-Pairs
870#[deprecated(
871    since = "51.0.0",
872    note = "This function will be internal in the future"
873)]
874pub fn collect_columns_from_predicate(
875    predicate: &'_ Arc<dyn PhysicalExpr>,
876) -> EqualAndNonEqual<'_> {
877    collect_columns_from_predicate_inner(predicate)
878}
879
880fn collect_columns_from_predicate_inner(
881    predicate: &'_ Arc<dyn PhysicalExpr>,
882) -> EqualAndNonEqual<'_> {
883    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
884    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
885
886    let predicates = split_conjunction(predicate);
887    predicates.into_iter().for_each(|p| {
888        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
889            // Only extract pairs where at least one side is a Column reference.
890            // Pairs like `complex_expr = literal` should not create equivalence
891            // classes — the literal could appear in many unrelated expressions
892            // (e.g. sort keys), and normalize_expr's deep traversal would
893            // replace those occurrences with the complex expression, corrupting
894            // sort orderings. Constant propagation for such pairs is handled
895            // separately by `extend_constants`.
896            let has_direct_column_operand =
897                binary.left().as_any().downcast_ref::<Column>().is_some()
898                    || binary.right().as_any().downcast_ref::<Column>().is_some();
899            if !has_direct_column_operand {
900                return;
901            }
902            match binary.op() {
903                Operator::Eq => {
904                    eq_predicate_columns.push((binary.left(), binary.right()))
905                }
906                Operator::NotEq => {
907                    ne_predicate_columns.push((binary.left(), binary.right()))
908                }
909                _ => {}
910            }
911        }
912    });
913
914    (eq_predicate_columns, ne_predicate_columns)
915}
916
917/// Pair of `Arc<dyn PhysicalExpr>`s
918pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
919
920/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
921pub type EqualAndNonEqual<'a> =
922    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use crate::empty::EmptyExec;
928    use crate::expressions::*;
929    use crate::test;
930    use crate::test::exec::StatisticsExec;
931    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
932    use datafusion_common::ScalarValue;
933
934    #[tokio::test]
935    async fn collect_columns_predicates() -> Result<()> {
936        let schema = test::aggr_test_schema();
937        let predicate: Arc<dyn PhysicalExpr> = binary(
938            binary(
939                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
940                Operator::And,
941                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
942                &schema,
943            )?,
944            Operator::And,
945            binary(
946                binary(
947                    col("c2", &schema)?,
948                    Operator::Eq,
949                    col("c9", &schema)?,
950                    &schema,
951                )?,
952                Operator::And,
953                binary(
954                    col("c1", &schema)?,
955                    Operator::NotEq,
956                    col("c13", &schema)?,
957                    &schema,
958                )?,
959                &schema,
960            )?,
961            &schema,
962        )?;
963
964        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
965        assert_eq!(2, equal_pairs.len());
966        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
967        assert!(equal_pairs[0].1.eq(&lit(4u32)));
968
969        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
970        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
971
972        assert_eq!(1, ne_pairs.len());
973        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
974        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
975
976        Ok(())
977    }
978
979    #[tokio::test]
980    async fn test_filter_statistics_basic_expr() -> Result<()> {
981        // Table:
982        //      a: min=1, max=100
983        let bytes_per_row = 4;
984        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
985        let input = Arc::new(StatisticsExec::new(
986            Statistics {
987                num_rows: Precision::Inexact(100),
988                total_byte_size: Precision::Inexact(100 * bytes_per_row),
989                column_statistics: vec![ColumnStatistics {
990                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
991                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
992                    ..Default::default()
993                }],
994            },
995            schema.clone(),
996        ));
997
998        // a <= 25
999        let predicate: Arc<dyn PhysicalExpr> =
1000            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1001
1002        // WHERE a <= 25
1003        let filter: Arc<dyn ExecutionPlan> =
1004            Arc::new(FilterExec::try_new(predicate, input)?);
1005
1006        let statistics = filter.partition_statistics(None)?;
1007        assert_eq!(statistics.num_rows, Precision::Inexact(25));
1008        assert_eq!(
1009            statistics.total_byte_size,
1010            Precision::Inexact(25 * bytes_per_row)
1011        );
1012        assert_eq!(
1013            statistics.column_statistics,
1014            vec![ColumnStatistics {
1015                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1016                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1017                ..Default::default()
1018            }]
1019        );
1020
1021        Ok(())
1022    }
1023
1024    #[tokio::test]
1025    async fn test_filter_statistics_column_level_nested() -> Result<()> {
1026        // Table:
1027        //      a: min=1, max=100
1028        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1029        let input = Arc::new(StatisticsExec::new(
1030            Statistics {
1031                num_rows: Precision::Inexact(100),
1032                column_statistics: vec![ColumnStatistics {
1033                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1034                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1035                    ..Default::default()
1036                }],
1037                total_byte_size: Precision::Absent,
1038            },
1039            schema.clone(),
1040        ));
1041
1042        // WHERE a <= 25
1043        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1044            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1045            input,
1046        )?);
1047
1048        // Nested filters (two separate physical plans, instead of AND chain in the expr)
1049        // WHERE a >= 10
1050        // WHERE a <= 25
1051        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1052            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1053            sub_filter,
1054        )?);
1055
1056        let statistics = filter.partition_statistics(None)?;
1057        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1058        assert_eq!(
1059            statistics.column_statistics,
1060            vec![ColumnStatistics {
1061                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1062                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1063                ..Default::default()
1064            }]
1065        );
1066
1067        Ok(())
1068    }
1069
1070    #[tokio::test]
1071    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1072        // Table:
1073        //      a: min=1, max=100
1074        //      b: min=1, max=50
1075        let schema = Schema::new(vec![
1076            Field::new("a", DataType::Int32, false),
1077            Field::new("b", DataType::Int32, false),
1078        ]);
1079        let input = Arc::new(StatisticsExec::new(
1080            Statistics {
1081                num_rows: Precision::Inexact(100),
1082                column_statistics: vec![
1083                    ColumnStatistics {
1084                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1085                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1086                        ..Default::default()
1087                    },
1088                    ColumnStatistics {
1089                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1090                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1091                        ..Default::default()
1092                    },
1093                ],
1094                total_byte_size: Precision::Absent,
1095            },
1096            schema.clone(),
1097        ));
1098
1099        // WHERE a <= 25
1100        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1101            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1102            input,
1103        )?);
1104
1105        // WHERE b > 45
1106        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1107            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1108            a_lte_25,
1109        )?);
1110
1111        // WHERE a >= 10
1112        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1113            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1114            b_gt_5,
1115        )?);
1116        let statistics = filter.partition_statistics(None)?;
1117        // On a uniform distribution, only fifteen rows will satisfy the
1118        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1119        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1120        //
1121        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1122        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1123        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1124        assert_eq!(
1125            statistics.column_statistics,
1126            vec![
1127                ColumnStatistics {
1128                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1129                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1130                    ..Default::default()
1131                },
1132                ColumnStatistics {
1133                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1134                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1135                    ..Default::default()
1136                }
1137            ]
1138        );
1139
1140        Ok(())
1141    }
1142
1143    #[tokio::test]
1144    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1145        // Table:
1146        //      a: min=???, max=??? (missing)
1147        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1148        let input = Arc::new(StatisticsExec::new(
1149            Statistics::new_unknown(&schema),
1150            schema.clone(),
1151        ));
1152
1153        // a <= 25
1154        let predicate: Arc<dyn PhysicalExpr> =
1155            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1156
1157        // WHERE a <= 25
1158        let filter: Arc<dyn ExecutionPlan> =
1159            Arc::new(FilterExec::try_new(predicate, input)?);
1160
1161        let statistics = filter.partition_statistics(None)?;
1162        assert_eq!(statistics.num_rows, Precision::Absent);
1163
1164        Ok(())
1165    }
1166
1167    #[tokio::test]
1168    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1169        // Table:
1170        //      a: min=1, max=100
1171        //      b: min=1, max=3
1172        //      c: min=1000.0  max=1100.0
1173        let schema = Schema::new(vec![
1174            Field::new("a", DataType::Int32, false),
1175            Field::new("b", DataType::Int32, false),
1176            Field::new("c", DataType::Float32, false),
1177        ]);
1178        let input = Arc::new(StatisticsExec::new(
1179            Statistics {
1180                num_rows: Precision::Inexact(1000),
1181                total_byte_size: Precision::Inexact(4000),
1182                column_statistics: vec![
1183                    ColumnStatistics {
1184                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1185                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1186                        ..Default::default()
1187                    },
1188                    ColumnStatistics {
1189                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1190                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1191                        ..Default::default()
1192                    },
1193                    ColumnStatistics {
1194                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1195                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1196                        ..Default::default()
1197                    },
1198                ],
1199            },
1200            schema,
1201        ));
1202        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1203        let predicate = Arc::new(BinaryExpr::new(
1204            Arc::new(BinaryExpr::new(
1205                Arc::new(Column::new("a", 0)),
1206                Operator::LtEq,
1207                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1208            )),
1209            Operator::And,
1210            Arc::new(BinaryExpr::new(
1211                Arc::new(BinaryExpr::new(
1212                    Arc::new(Column::new("b", 1)),
1213                    Operator::Eq,
1214                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1215                )),
1216                Operator::And,
1217                Arc::new(BinaryExpr::new(
1218                    Arc::new(BinaryExpr::new(
1219                        Arc::new(Column::new("c", 2)),
1220                        Operator::LtEq,
1221                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1222                    )),
1223                    Operator::And,
1224                    Arc::new(BinaryExpr::new(
1225                        Arc::new(Column::new("a", 0)),
1226                        Operator::Gt,
1227                        Arc::new(Column::new("b", 1)),
1228                    )),
1229                )),
1230            )),
1231        ));
1232        let filter: Arc<dyn ExecutionPlan> =
1233            Arc::new(FilterExec::try_new(predicate, input)?);
1234        let statistics = filter.partition_statistics(None)?;
1235        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1236        // num_rows after ceil => 133.0... => 134
1237        // total_byte_size after ceil => 532.0... => 533
1238        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1239        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1240        let exp_col_stats = vec![
1241            ColumnStatistics {
1242                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1243                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1244                ..Default::default()
1245            },
1246            ColumnStatistics {
1247                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1248                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1249                ..Default::default()
1250            },
1251            ColumnStatistics {
1252                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1253                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1254                ..Default::default()
1255            },
1256        ];
1257        let _ = exp_col_stats
1258            .into_iter()
1259            .zip(statistics.column_statistics)
1260            .map(|(expected, actual)| {
1261                if let Some(val) = actual.min_value.get_value() {
1262                    if val.data_type().is_floating() {
1263                        // Windows rounds arithmetic operation results differently for floating point numbers.
1264                        // Therefore, we check if the actual values are in an epsilon range.
1265                        let actual_min = actual.min_value.get_value().unwrap();
1266                        let actual_max = actual.max_value.get_value().unwrap();
1267                        let expected_min = expected.min_value.get_value().unwrap();
1268                        let expected_max = expected.max_value.get_value().unwrap();
1269                        let eps = ScalarValue::Float32(Some(1e-6));
1270
1271                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1272                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1273
1274                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1275                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1276                    } else {
1277                        assert_eq!(actual, expected);
1278                    }
1279                } else {
1280                    assert_eq!(actual, expected);
1281                }
1282            });
1283
1284        Ok(())
1285    }
1286
1287    #[tokio::test]
1288    async fn test_filter_statistics_full_selective() -> Result<()> {
1289        // Table:
1290        //      a: min=1, max=100
1291        //      b: min=1, max=3
1292        let schema = Schema::new(vec![
1293            Field::new("a", DataType::Int32, false),
1294            Field::new("b", DataType::Int32, false),
1295        ]);
1296        let input = Arc::new(StatisticsExec::new(
1297            Statistics {
1298                num_rows: Precision::Inexact(1000),
1299                total_byte_size: Precision::Inexact(4000),
1300                column_statistics: vec![
1301                    ColumnStatistics {
1302                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1303                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1304                        ..Default::default()
1305                    },
1306                    ColumnStatistics {
1307                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1308                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1309                        ..Default::default()
1310                    },
1311                ],
1312            },
1313            schema,
1314        ));
1315        // WHERE a<200 AND 1<=b
1316        let predicate = Arc::new(BinaryExpr::new(
1317            Arc::new(BinaryExpr::new(
1318                Arc::new(Column::new("a", 0)),
1319                Operator::Lt,
1320                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1321            )),
1322            Operator::And,
1323            Arc::new(BinaryExpr::new(
1324                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1325                Operator::LtEq,
1326                Arc::new(Column::new("b", 1)),
1327            )),
1328        ));
1329        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1330        let expected = input.partition_statistics(None)?.column_statistics;
1331        let filter: Arc<dyn ExecutionPlan> =
1332            Arc::new(FilterExec::try_new(predicate, input)?);
1333        let statistics = filter.partition_statistics(None)?;
1334
1335        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1336        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1337        assert_eq!(statistics.column_statistics, expected);
1338
1339        Ok(())
1340    }
1341
1342    #[tokio::test]
1343    async fn test_filter_statistics_zero_selective() -> Result<()> {
1344        // Table:
1345        //      a: min=1, max=100
1346        //      b: min=1, max=3
1347        let schema = Schema::new(vec![
1348            Field::new("a", DataType::Int32, false),
1349            Field::new("b", DataType::Int32, false),
1350        ]);
1351        let input = Arc::new(StatisticsExec::new(
1352            Statistics {
1353                num_rows: Precision::Inexact(1000),
1354                total_byte_size: Precision::Inexact(4000),
1355                column_statistics: vec![
1356                    ColumnStatistics {
1357                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1358                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1359                        ..Default::default()
1360                    },
1361                    ColumnStatistics {
1362                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1363                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1364                        ..Default::default()
1365                    },
1366                ],
1367            },
1368            schema,
1369        ));
1370        // WHERE a>200 AND 1<=b
1371        let predicate = Arc::new(BinaryExpr::new(
1372            Arc::new(BinaryExpr::new(
1373                Arc::new(Column::new("a", 0)),
1374                Operator::Gt,
1375                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1376            )),
1377            Operator::And,
1378            Arc::new(BinaryExpr::new(
1379                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1380                Operator::LtEq,
1381                Arc::new(Column::new("b", 1)),
1382            )),
1383        ));
1384        let filter: Arc<dyn ExecutionPlan> =
1385            Arc::new(FilterExec::try_new(predicate, input)?);
1386        let statistics = filter.partition_statistics(None)?;
1387
1388        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1389        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1390        assert_eq!(
1391            statistics.column_statistics,
1392            vec![
1393                ColumnStatistics {
1394                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1395                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1396                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1397                    distinct_count: Precision::Exact(0),
1398                    null_count: Precision::Exact(0),
1399                    byte_size: Precision::Absent,
1400                },
1401                ColumnStatistics {
1402                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1403                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1404                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1405                    distinct_count: Precision::Exact(0),
1406                    null_count: Precision::Exact(0),
1407                    byte_size: Precision::Absent,
1408                },
1409            ]
1410        );
1411
1412        Ok(())
1413    }
1414
1415    /// Regression test: stacking two FilterExecs where the inner filter
1416    /// proves zero selectivity should not panic with a type mismatch
1417    /// during interval intersection.
1418    ///
1419    /// Previously, when a filter proved no rows could match, the column
1420    /// statistics used untyped `ScalarValue::Null` (data type `Null`).
1421    /// If an outer FilterExec then tried to analyze its own predicate
1422    /// against those statistics, `Interval::intersect` would fail with:
1423    ///   "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1424    #[tokio::test]
1425    async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1426        // Inner table: a: [1, 100], b: [1, 3]
1427        let schema = Schema::new(vec![
1428            Field::new("a", DataType::Int32, false),
1429            Field::new("b", DataType::Int32, false),
1430        ]);
1431        let input = Arc::new(StatisticsExec::new(
1432            Statistics {
1433                num_rows: Precision::Inexact(1000),
1434                total_byte_size: Precision::Inexact(4000),
1435                column_statistics: vec![
1436                    ColumnStatistics {
1437                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1438                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1439                        ..Default::default()
1440                    },
1441                    ColumnStatistics {
1442                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1443                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1444                        ..Default::default()
1445                    },
1446                ],
1447            },
1448            schema,
1449        ));
1450
1451        // Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1452        let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1453            Arc::new(Column::new("a", 0)),
1454            Operator::Gt,
1455            Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1456        ));
1457        let inner_filter: Arc<dyn ExecutionPlan> =
1458            Arc::new(FilterExec::try_new(inner_predicate, input)?);
1459
1460        // Outer filter: a = 50
1461        // Before the fix, this would panic because the inner filter's
1462        // zero-selectivity statistics produced Null-typed intervals for
1463        // column `a`, which couldn't intersect with the Int32 literal.
1464        let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1465            Arc::new(Column::new("a", 0)),
1466            Operator::Eq,
1467            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1468        ));
1469        let outer_filter: Arc<dyn ExecutionPlan> =
1470            Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1471
1472        // Should succeed without error
1473        let statistics = outer_filter.partition_statistics(None)?;
1474        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1475
1476        Ok(())
1477    }
1478
1479    #[tokio::test]
1480    async fn test_filter_statistics_more_inputs() -> Result<()> {
1481        let schema = Schema::new(vec![
1482            Field::new("a", DataType::Int32, false),
1483            Field::new("b", DataType::Int32, false),
1484        ]);
1485        let input = Arc::new(StatisticsExec::new(
1486            Statistics {
1487                num_rows: Precision::Inexact(1000),
1488                total_byte_size: Precision::Inexact(4000),
1489                column_statistics: vec![
1490                    ColumnStatistics {
1491                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1492                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1493                        ..Default::default()
1494                    },
1495                    ColumnStatistics {
1496                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1497                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1498                        ..Default::default()
1499                    },
1500                ],
1501            },
1502            schema,
1503        ));
1504        // WHERE a<50
1505        let predicate = Arc::new(BinaryExpr::new(
1506            Arc::new(Column::new("a", 0)),
1507            Operator::Lt,
1508            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1509        ));
1510        let filter: Arc<dyn ExecutionPlan> =
1511            Arc::new(FilterExec::try_new(predicate, input)?);
1512        let statistics = filter.partition_statistics(None)?;
1513
1514        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1515        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1516        assert_eq!(
1517            statistics.column_statistics,
1518            vec![
1519                ColumnStatistics {
1520                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1521                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1522                    ..Default::default()
1523                },
1524                ColumnStatistics {
1525                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1526                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1527                    ..Default::default()
1528                },
1529            ]
1530        );
1531
1532        Ok(())
1533    }
1534
1535    #[tokio::test]
1536    async fn test_empty_input_statistics() -> Result<()> {
1537        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1538        let input = Arc::new(StatisticsExec::new(
1539            Statistics::new_unknown(&schema),
1540            schema,
1541        ));
1542        // WHERE a <= 10 AND 0 <= a - 5
1543        let predicate = Arc::new(BinaryExpr::new(
1544            Arc::new(BinaryExpr::new(
1545                Arc::new(Column::new("a", 0)),
1546                Operator::LtEq,
1547                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1548            )),
1549            Operator::And,
1550            Arc::new(BinaryExpr::new(
1551                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1552                Operator::LtEq,
1553                Arc::new(BinaryExpr::new(
1554                    Arc::new(Column::new("a", 0)),
1555                    Operator::Minus,
1556                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1557                )),
1558            )),
1559        ));
1560        let filter: Arc<dyn ExecutionPlan> =
1561            Arc::new(FilterExec::try_new(predicate, input)?);
1562        let filter_statistics = filter.partition_statistics(None)?;
1563
1564        let expected_filter_statistics = Statistics {
1565            num_rows: Precision::Absent,
1566            total_byte_size: Precision::Absent,
1567            column_statistics: vec![ColumnStatistics {
1568                null_count: Precision::Absent,
1569                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1570                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1571                sum_value: Precision::Absent,
1572                distinct_count: Precision::Absent,
1573                byte_size: Precision::Absent,
1574            }],
1575        };
1576
1577        assert_eq!(filter_statistics, expected_filter_statistics);
1578
1579        Ok(())
1580    }
1581
1582    #[tokio::test]
1583    async fn test_statistics_with_constant_column() -> Result<()> {
1584        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1585        let input = Arc::new(StatisticsExec::new(
1586            Statistics::new_unknown(&schema),
1587            schema,
1588        ));
1589        // WHERE a = 10
1590        let predicate = Arc::new(BinaryExpr::new(
1591            Arc::new(Column::new("a", 0)),
1592            Operator::Eq,
1593            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1594        ));
1595        let filter: Arc<dyn ExecutionPlan> =
1596            Arc::new(FilterExec::try_new(predicate, input)?);
1597        let filter_statistics = filter.partition_statistics(None)?;
1598        // First column is "a", and it is a column with only one value after the filter.
1599        assert!(filter_statistics.column_statistics[0].is_singleton());
1600
1601        Ok(())
1602    }
1603
1604    #[tokio::test]
1605    async fn test_validation_filter_selectivity() -> Result<()> {
1606        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1607        let input = Arc::new(StatisticsExec::new(
1608            Statistics::new_unknown(&schema),
1609            schema,
1610        ));
1611        // WHERE a = 10
1612        let predicate = Arc::new(BinaryExpr::new(
1613            Arc::new(Column::new("a", 0)),
1614            Operator::Eq,
1615            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1616        ));
1617        let filter = FilterExec::try_new(predicate, input)?;
1618        assert!(filter.with_default_selectivity(120).is_err());
1619        Ok(())
1620    }
1621
1622    #[tokio::test]
1623    async fn test_custom_filter_selectivity() -> Result<()> {
1624        // Need a decimal to trigger inexact selectivity
1625        let schema =
1626            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1627        let input = Arc::new(StatisticsExec::new(
1628            Statistics {
1629                num_rows: Precision::Inexact(1000),
1630                total_byte_size: Precision::Inexact(4000),
1631                column_statistics: vec![ColumnStatistics {
1632                    ..Default::default()
1633                }],
1634            },
1635            schema,
1636        ));
1637        // WHERE a = 10
1638        let predicate = Arc::new(BinaryExpr::new(
1639            Arc::new(Column::new("a", 0)),
1640            Operator::Eq,
1641            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1642        ));
1643        let filter = FilterExec::try_new(predicate, input)?;
1644        let statistics = filter.partition_statistics(None)?;
1645        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1646        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1647        let filter = filter.with_default_selectivity(40)?;
1648        let statistics = filter.partition_statistics(None)?;
1649        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1650        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1651        Ok(())
1652    }
1653
1654    #[test]
1655    fn test_equivalence_properties_union_type() -> Result<()> {
1656        let union_type = DataType::Union(
1657            UnionFields::new(
1658                vec![0, 1],
1659                vec![
1660                    Field::new("f1", DataType::Int32, true),
1661                    Field::new("f2", DataType::Utf8, true),
1662                ],
1663            ),
1664            UnionMode::Sparse,
1665        );
1666
1667        let schema = Arc::new(Schema::new(vec![
1668            Field::new("c1", DataType::Int32, true),
1669            Field::new("c2", union_type, true),
1670        ]));
1671
1672        let exec = FilterExec::try_new(
1673            binary(
1674                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1675                Operator::And,
1676                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1677                &schema,
1678            )?,
1679            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1680        )?;
1681
1682        exec.partition_statistics(None).unwrap();
1683
1684        Ok(())
1685    }
1686
1687    #[test]
1688    fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
1689        // Test that FilterExec with a projection must remap parent dynamic
1690        // filter column indices from its output schema to the input schema
1691        // before passing them to the child.
1692        let input_schema = Arc::new(Schema::new(vec![
1693            Field::new("a", DataType::Int32, false),
1694            Field::new("b", DataType::Utf8, false),
1695            Field::new("c", DataType::Float64, false),
1696        ]));
1697        let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
1698
1699        // FilterExec: a > 0, projection=[c@2]
1700        let predicate = Arc::new(BinaryExpr::new(
1701            Arc::new(Column::new("a", 0)),
1702            Operator::Gt,
1703            Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1704        ));
1705        let filter =
1706            FilterExec::try_new(predicate, input)?.with_projection(Some(vec![2]))?;
1707
1708        // Output schema should be [c:Float64]
1709        let output_schema = filter.schema();
1710        assert_eq!(output_schema.fields().len(), 1);
1711        assert_eq!(output_schema.field(0).name(), "c");
1712
1713        // Simulate a parent dynamic filter referencing output column c@0
1714        let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
1715
1716        let config = ConfigOptions::new();
1717        let desc = filter.gather_filters_for_pushdown(
1718            FilterPushdownPhase::Post,
1719            vec![parent_filter],
1720            &config,
1721        )?;
1722
1723        // The filter pushed to the child must reference c@2 (input schema),
1724        // not c@0 (output schema).
1725        let parent_filters = desc.parent_filters();
1726        assert_eq!(parent_filters.len(), 1); // one child
1727        assert_eq!(parent_filters[0].len(), 1); // one filter
1728        let remapped = &parent_filters[0][0].predicate;
1729        let display = format!("{remapped}");
1730        assert_eq!(
1731            display, "c@2",
1732            "Post-phase parent filter column index must be remapped \
1733             from output schema (c@0) to input schema (c@2)"
1734        );
1735
1736        Ok(())
1737    }
1738    /// Regression test for https://github.com/apache/datafusion/issues/20194
1739    ///
1740    /// `collect_columns_from_predicate_inner` should only extract equality
1741    /// pairs where at least one side is a Column. Pairs like
1742    /// `complex_expr = literal` must not create equivalence classes because
1743    /// `normalize_expr`'s deep traversal would replace the literal inside
1744    /// unrelated expressions (e.g. sort keys) with the complex expression.
1745    #[test]
1746    fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
1747        let schema = test::aggr_test_schema();
1748
1749        // Simulate: nvl(c2, 0) = 0  →  (c2 IS DISTINCT FROM 0) = 0
1750        // Neither side is a Column, so this should NOT be extracted.
1751        let complex_expr: Arc<dyn PhysicalExpr> = binary(
1752            col("c2", &schema)?,
1753            Operator::IsDistinctFrom,
1754            lit(0u32),
1755            &schema,
1756        )?;
1757        let predicate: Arc<dyn PhysicalExpr> =
1758            binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
1759
1760        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1761        assert_eq!(
1762            0,
1763            equal_pairs.len(),
1764            "Should not extract equality pairs where neither side is a Column"
1765        );
1766
1767        // But col = literal should still be extracted
1768        let predicate: Arc<dyn PhysicalExpr> =
1769            binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
1770        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1771        assert_eq!(
1772            1,
1773            equal_pairs.len(),
1774            "Should extract equality pairs where one side is a Column"
1775        );
1776
1777        Ok(())
1778    }
1779}