Skip to main content

datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
24use itertools::Itertools;
25
26use super::{
27    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
28    RecordBatchStream, SendableRecordBatchStream, Statistics,
29};
30use crate::check_if_same_properties;
31use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
32use crate::common::can_project;
33use crate::execution_plan::CardinalityEffect;
34use crate::filter_pushdown::{
35    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
36    FilterPushdownPropagation, PushedDown,
37};
38use crate::metrics::{MetricBuilder, MetricType};
39use crate::projection::{
40    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
41    try_embed_projection, update_expr,
42};
43use crate::{
44    DisplayFormatType, ExecutionPlan,
45    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
46};
47
48use arrow::compute::filter_record_batch;
49use arrow::datatypes::{DataType, SchemaRef};
50use arrow::record_batch::RecordBatch;
51use datafusion_common::cast::as_boolean_array;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::stats::Precision;
54use datafusion_common::{
55    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
56};
57use datafusion_execution::TaskContext;
58use datafusion_expr::Operator;
59use datafusion_physical_expr::equivalence::ProjectionMapping;
60use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
61use datafusion_physical_expr::intervals::utils::check_support;
62use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
63use datafusion_physical_expr::{
64    AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
65    PhysicalExpr, analyze, conjunction, split_conjunction,
66};
67
68use datafusion_physical_expr_common::physical_expr::fmt_sql;
69use futures::stream::{Stream, StreamExt};
70use log::trace;
71
72const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
73const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
74
75/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
76/// include in its output batches.
77#[derive(Debug, Clone)]
78pub struct FilterExec {
79    /// The expression to filter on. This expression must evaluate to a boolean value.
80    predicate: Arc<dyn PhysicalExpr>,
81    /// The input plan
82    input: Arc<dyn ExecutionPlan>,
83    /// Execution metrics
84    metrics: ExecutionPlanMetricsSet,
85    /// Selectivity for statistics. 0 = no rows, 100 = all rows
86    default_selectivity: u8,
87    /// Properties equivalence properties, partitioning, etc.
88    cache: Arc<PlanProperties>,
89    /// The projection indices of the columns in the output schema of join
90    projection: Option<ProjectionRef>,
91    /// Target batch size for output batches
92    batch_size: usize,
93    /// Number of rows to fetch
94    fetch: Option<usize>,
95}
96
97/// Builder for [`FilterExec`] to set optional parameters
98pub struct FilterExecBuilder {
99    predicate: Arc<dyn PhysicalExpr>,
100    input: Arc<dyn ExecutionPlan>,
101    projection: Option<ProjectionRef>,
102    default_selectivity: u8,
103    batch_size: usize,
104    fetch: Option<usize>,
105}
106
107impl FilterExecBuilder {
108    /// Create a new builder with required parameters (predicate and input)
109    pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
110        Self {
111            predicate,
112            input,
113            projection: None,
114            default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
115            batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
116            fetch: None,
117        }
118    }
119
120    /// Set the input execution plan
121    pub fn with_input(mut self, input: Arc<dyn ExecutionPlan>) -> Self {
122        self.input = input;
123        self
124    }
125
126    /// Set the predicate expression
127    pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
128        self.predicate = predicate;
129        self
130    }
131
132    /// Set the projection, composing with any existing projection.
133    ///
134    /// If a projection is already set, the new projection indices are mapped
135    /// through the existing projection. For example, if the current projection
136    /// is `[0, 2, 3]` and `apply_projection(Some(vec![0, 2]))` is called, the
137    /// resulting projection will be `[0, 3]` (indices 0 and 2 of `[0, 2, 3]`).
138    ///
139    /// If no projection is currently set, the new projection is used directly.
140    /// If `None` is passed, the projection is cleared.
141    pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> {
142        let projection = projection.map(Into::into);
143        self.apply_projection_by_ref(projection.as_ref())
144    }
145
146    /// The same as [`Self::apply_projection`] but takes projection shared reference.
147    pub fn apply_projection_by_ref(
148        mut self,
149        projection: Option<&ProjectionRef>,
150    ) -> Result<Self> {
151        // Check if the projection is valid against current output schema
152        can_project(&self.input.schema(), projection.map(AsRef::as_ref))?;
153        self.projection = combine_projections(projection, self.projection.as_ref())?;
154        Ok(self)
155    }
156
157    /// Set the default selectivity
158    pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
159        self.default_selectivity = default_selectivity;
160        self
161    }
162
163    /// Set the batch size
164    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
165        self.batch_size = batch_size;
166        self
167    }
168
169    /// Set the fetch limit
170    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
171        self.fetch = fetch;
172        self
173    }
174
175    /// Build the FilterExec, computing properties once with all configured parameters
176    pub fn build(self) -> Result<FilterExec> {
177        // Validate predicate type
178        match self.predicate.data_type(self.input.schema().as_ref())? {
179            DataType::Boolean => {}
180            other => {
181                return plan_err!(
182                    "Filter predicate must return BOOLEAN values, got {other:?}"
183                );
184            }
185        }
186
187        // Validate selectivity
188        if self.default_selectivity > 100 {
189            return plan_err!(
190                "Default filter selectivity value needs to be less than or equal to 100"
191            );
192        }
193
194        // Validate projection if provided
195        can_project(&self.input.schema(), self.projection.as_deref())?;
196
197        // Compute properties once with all parameters
198        let cache = FilterExec::compute_properties(
199            &self.input,
200            &self.predicate,
201            self.default_selectivity,
202            self.projection.as_deref(),
203        )?;
204
205        Ok(FilterExec {
206            predicate: self.predicate,
207            input: self.input,
208            metrics: ExecutionPlanMetricsSet::new(),
209            default_selectivity: self.default_selectivity,
210            cache: Arc::new(cache),
211            projection: self.projection,
212            batch_size: self.batch_size,
213            fetch: self.fetch,
214        })
215    }
216}
217
218impl From<&FilterExec> for FilterExecBuilder {
219    fn from(exec: &FilterExec) -> Self {
220        Self {
221            predicate: Arc::clone(&exec.predicate),
222            input: Arc::clone(&exec.input),
223            projection: exec.projection.clone(),
224            default_selectivity: exec.default_selectivity,
225            batch_size: exec.batch_size,
226            fetch: exec.fetch,
227            // We could cache / copy over PlanProperties
228            // here but that would require invalidating them in FilterExecBuilder::apply_projection, etc.
229            // and currently every call to this method ends up invalidating them anyway.
230            // If useful this can be added in the future as a non-breaking change.
231        }
232    }
233}
234
235impl FilterExec {
236    /// Create a FilterExec on an input using the builder pattern
237    pub fn try_new(
238        predicate: Arc<dyn PhysicalExpr>,
239        input: Arc<dyn ExecutionPlan>,
240    ) -> Result<Self> {
241        FilterExecBuilder::new(predicate, input).build()
242    }
243
244    /// Get a batch size
245    pub fn batch_size(&self) -> usize {
246        self.batch_size
247    }
248
249    /// Set the default selectivity
250    pub fn with_default_selectivity(
251        mut self,
252        default_selectivity: u8,
253    ) -> Result<Self, DataFusionError> {
254        if default_selectivity > 100 {
255            return plan_err!(
256                "Default filter selectivity value needs to be less than or equal to 100"
257            );
258        }
259        self.default_selectivity = default_selectivity;
260        Ok(self)
261    }
262
263    /// Return new instance of [FilterExec] with the given projection.
264    ///
265    /// # Deprecated
266    /// Use [`FilterExecBuilder::apply_projection`] instead
267    #[deprecated(
268        since = "52.0.0",
269        note = "Use FilterExecBuilder::apply_projection instead"
270    )]
271    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
272        let builder = FilterExecBuilder::from(self);
273        builder.apply_projection(projection)?.build()
274    }
275
276    /// Set the batch size
277    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
278        Ok(Self {
279            predicate: Arc::clone(&self.predicate),
280            input: Arc::clone(&self.input),
281            metrics: self.metrics.clone(),
282            default_selectivity: self.default_selectivity,
283            cache: Arc::clone(&self.cache),
284            projection: self.projection.clone(),
285            batch_size,
286            fetch: self.fetch,
287        })
288    }
289
290    /// The expression to filter on. This expression must evaluate to a boolean value.
291    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
292        &self.predicate
293    }
294
295    /// The input plan
296    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
297        &self.input
298    }
299
300    /// The default selectivity
301    pub fn default_selectivity(&self) -> u8 {
302        self.default_selectivity
303    }
304
305    /// Projection
306    pub fn projection(&self) -> &Option<ProjectionRef> {
307        &self.projection
308    }
309
310    /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
311    fn statistics_helper(
312        schema: &SchemaRef,
313        input_stats: Statistics,
314        predicate: &Arc<dyn PhysicalExpr>,
315        default_selectivity: u8,
316    ) -> Result<Statistics> {
317        if !check_support(predicate, schema) {
318            let selectivity = default_selectivity as f64 / 100.0;
319            let mut stats = input_stats.to_inexact();
320            stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
321            stats.total_byte_size = stats
322                .total_byte_size
323                .with_estimated_selectivity(selectivity);
324            return Ok(stats);
325        }
326
327        let num_rows = input_stats.num_rows;
328        let total_byte_size = input_stats.total_byte_size;
329        let input_analysis_ctx =
330            AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?;
331
332        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
333
334        // Estimate (inexact) selectivity of predicate
335        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
336        let num_rows = num_rows.with_estimated_selectivity(selectivity);
337        let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
338
339        let column_statistics = collect_new_statistics(
340            schema,
341            &input_stats.column_statistics,
342            analysis_ctx.boundaries,
343        );
344        Ok(Statistics {
345            num_rows,
346            total_byte_size,
347            column_statistics,
348        })
349    }
350
351    /// Returns the `AcrossPartitions` value for `expr` if it is constant:
352    /// either already known constant in `input_eqs`, or a `Literal`
353    /// (which is inherently constant across all partitions).
354    fn expr_constant_or_literal(
355        expr: &Arc<dyn PhysicalExpr>,
356        input_eqs: &EquivalenceProperties,
357    ) -> Option<AcrossPartitions> {
358        input_eqs.is_expr_constant(expr).or_else(|| {
359            expr.as_any()
360                .downcast_ref::<Literal>()
361                .map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
362        })
363    }
364
365    fn extend_constants(
366        input: &Arc<dyn ExecutionPlan>,
367        predicate: &Arc<dyn PhysicalExpr>,
368    ) -> Vec<ConstExpr> {
369        let mut res_constants = Vec::new();
370        let input_eqs = input.equivalence_properties();
371
372        let conjunctions = split_conjunction(predicate);
373        for conjunction in conjunctions {
374            if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
375                && binary.op() == &Operator::Eq
376            {
377                // Check if either side is constant — either already known
378                // constant from the input equivalence properties, or a literal
379                // value (which is inherently constant across all partitions).
380                let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
381                let right_const =
382                    Self::expr_constant_or_literal(binary.right(), input_eqs);
383
384                if let Some(left_across) = left_const {
385                    // LEFT is constant, so RIGHT must also be constant.
386                    // Use RIGHT's known across value if available, otherwise
387                    // propagate LEFT's (e.g. Uniform from a literal).
388                    let across = right_const.unwrap_or(left_across);
389                    res_constants
390                        .push(ConstExpr::new(Arc::clone(binary.right()), across));
391                } else if let Some(right_across) = right_const {
392                    // RIGHT is constant, so LEFT must also be constant.
393                    res_constants
394                        .push(ConstExpr::new(Arc::clone(binary.left()), right_across));
395                }
396            }
397        }
398        res_constants
399    }
400    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
401    fn compute_properties(
402        input: &Arc<dyn ExecutionPlan>,
403        predicate: &Arc<dyn PhysicalExpr>,
404        default_selectivity: u8,
405        projection: Option<&[usize]>,
406    ) -> Result<PlanProperties> {
407        // Combine the equal predicates with the input equivalence properties
408        // to construct the equivalence properties:
409        let schema = input.schema();
410        let stats = Self::statistics_helper(
411            &schema,
412            input.partition_statistics(None)?,
413            predicate,
414            default_selectivity,
415        )?;
416        let mut eq_properties = input.equivalence_properties().clone();
417        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
418        for (lhs, rhs) in equal_pairs {
419            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
420        }
421        // Add the columns that have only one viable value (singleton) after
422        // filtering to constants.
423        let constants = collect_columns(predicate)
424            .into_iter()
425            .filter(|column| stats.column_statistics[column.index()].is_singleton())
426            .map(|column| {
427                let value = stats.column_statistics[column.index()]
428                    .min_value
429                    .get_value();
430                let expr = Arc::new(column) as _;
431                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
432            });
433        // This is for statistics
434        eq_properties.add_constants(constants)?;
435        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
436        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
437        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
438
439        let mut output_partitioning = input.output_partitioning().clone();
440        // If contains projection, update the PlanProperties.
441        if let Some(projection) = projection {
442            let schema = eq_properties.schema();
443            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
444            let out_schema = project_schema(schema, Some(&projection))?;
445            output_partitioning =
446                output_partitioning.project(&projection_mapping, &eq_properties);
447            eq_properties = eq_properties.project(&projection_mapping, out_schema);
448        }
449
450        Ok(PlanProperties::new(
451            eq_properties,
452            output_partitioning,
453            input.pipeline_behavior(),
454            input.boundedness(),
455        ))
456    }
457
458    fn with_new_children_and_same_properties(
459        &self,
460        mut children: Vec<Arc<dyn ExecutionPlan>>,
461    ) -> Self {
462        Self {
463            input: children.swap_remove(0),
464            metrics: ExecutionPlanMetricsSet::new(),
465            ..Self::clone(self)
466        }
467    }
468}
469
470impl DisplayAs for FilterExec {
471    fn fmt_as(
472        &self,
473        t: DisplayFormatType,
474        f: &mut std::fmt::Formatter,
475    ) -> std::fmt::Result {
476        match t {
477            DisplayFormatType::Default | DisplayFormatType::Verbose => {
478                let display_projections = if let Some(projection) =
479                    self.projection.as_ref()
480                {
481                    format!(
482                        ", projection=[{}]",
483                        projection
484                            .iter()
485                            .map(|index| format!(
486                                "{}@{}",
487                                self.input.schema().fields().get(*index).unwrap().name(),
488                                index
489                            ))
490                            .collect::<Vec<_>>()
491                            .join(", ")
492                    )
493                } else {
494                    "".to_string()
495                };
496                let fetch = self
497                    .fetch
498                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
499                write!(
500                    f,
501                    "FilterExec: {}{}{}",
502                    self.predicate, display_projections, fetch
503                )
504            }
505            DisplayFormatType::TreeRender => {
506                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
507            }
508        }
509    }
510}
511
512impl ExecutionPlan for FilterExec {
513    fn name(&self) -> &'static str {
514        "FilterExec"
515    }
516
517    /// Return a reference to Any that can be used for downcasting
518    fn as_any(&self) -> &dyn Any {
519        self
520    }
521
522    fn properties(&self) -> &Arc<PlanProperties> {
523        &self.cache
524    }
525
526    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
527        vec![&self.input]
528    }
529
530    fn maintains_input_order(&self) -> Vec<bool> {
531        // Tell optimizer this operator doesn't reorder its input
532        vec![true]
533    }
534
535    fn with_new_children(
536        self: Arc<Self>,
537        mut children: Vec<Arc<dyn ExecutionPlan>>,
538    ) -> Result<Arc<dyn ExecutionPlan>> {
539        check_if_same_properties!(self, children);
540        let new_input = children.swap_remove(0);
541        FilterExecBuilder::from(&*self)
542            .with_input(new_input)
543            .build()
544            .map(|e| Arc::new(e) as _)
545    }
546
547    fn execute(
548        &self,
549        partition: usize,
550        context: Arc<TaskContext>,
551    ) -> Result<SendableRecordBatchStream> {
552        trace!(
553            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
554            partition,
555            context.session_id(),
556            context.task_id()
557        );
558        let metrics = FilterExecMetrics::new(&self.metrics, partition);
559        Ok(Box::pin(FilterExecStream {
560            schema: self.schema(),
561            predicate: Arc::clone(&self.predicate),
562            input: self.input.execute(partition, context)?,
563            metrics,
564            projection: self.projection.clone(),
565            batch_coalescer: LimitedBatchCoalescer::new(
566                self.schema(),
567                self.batch_size,
568                self.fetch,
569            ),
570        }))
571    }
572
573    fn metrics(&self) -> Option<MetricsSet> {
574        Some(self.metrics.clone_inner())
575    }
576
577    /// The output statistics of a filtering operation can be estimated if the
578    /// predicate's selectivity value can be determined for the incoming data.
579    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
580        let input_stats = self.input.partition_statistics(partition)?;
581        let stats = Self::statistics_helper(
582            &self.input.schema(),
583            input_stats,
584            self.predicate(),
585            self.default_selectivity,
586        )?;
587        Ok(stats.project(self.projection.as_ref()))
588    }
589
590    fn cardinality_effect(&self) -> CardinalityEffect {
591        CardinalityEffect::LowerEqual
592    }
593
594    /// Tries to swap `projection` with its input (`filter`). If possible, performs
595    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
596    fn try_swapping_with_projection(
597        &self,
598        projection: &ProjectionExec,
599    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
600        // If the projection does not narrow the schema, we should not try to push it down:
601        if projection.expr().len() < projection.input().schema().fields().len() {
602            // Each column in the predicate expression must exist after the projection.
603            if let Some(new_predicate) =
604                update_expr(self.predicate(), projection.expr(), false)?
605            {
606                return FilterExecBuilder::from(self)
607                    .with_input(make_with_child(projection, self.input())?)
608                    .with_predicate(new_predicate)
609                    .build()
610                    .map(|e| Some(Arc::new(e) as _));
611            }
612        }
613        try_embed_projection(projection, self)
614    }
615
616    fn gather_filters_for_pushdown(
617        &self,
618        phase: FilterPushdownPhase,
619        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
620        _config: &ConfigOptions,
621    ) -> Result<FilterDescription> {
622        if phase != FilterPushdownPhase::Pre {
623            let child =
624                ChildFilterDescription::from_child(&parent_filters, self.input())?;
625            return Ok(FilterDescription::new().with_child(child));
626        }
627
628        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
629            .with_self_filters(
630                split_conjunction(&self.predicate)
631                    .into_iter()
632                    .cloned()
633                    .collect(),
634            );
635
636        Ok(FilterDescription::new().with_child(child))
637    }
638
639    fn handle_child_pushdown_result(
640        &self,
641        phase: FilterPushdownPhase,
642        child_pushdown_result: ChildPushdownResult,
643        _config: &ConfigOptions,
644    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
645        if phase != FilterPushdownPhase::Pre {
646            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
647        }
648        // We absorb any parent filters that were not handled by our children
649        let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
650            child_pushdown_result
651                .parent_filters
652                .iter()
653                .filter_map(|f| {
654                    matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
655                })
656                .collect();
657
658        // If this FilterExec has a projection, the unsupported parent filters
659        // are in the output schema (after projection) coordinates. We need to
660        // remap them to the input schema coordinates before combining with self filters.
661        if self.projection.is_some() {
662            let input_schema = self.input().schema();
663            unsupported_parent_filters = unsupported_parent_filters
664                .into_iter()
665                .map(|expr| reassign_expr_columns(expr, &input_schema))
666                .collect::<Result<Vec<_>>>()?;
667        }
668
669        let unsupported_self_filters = child_pushdown_result
670            .self_filters
671            .first()
672            .expect("we have exactly one child")
673            .iter()
674            .filter_map(|f| match f.discriminant {
675                PushedDown::Yes => None,
676                PushedDown::No => Some(&f.predicate),
677            })
678            .cloned();
679
680        let unhandled_filters = unsupported_parent_filters
681            .into_iter()
682            .chain(unsupported_self_filters)
683            .collect_vec();
684
685        // If we have unhandled filters, we need to create a new FilterExec
686        let filter_input = Arc::clone(self.input());
687        let new_predicate = conjunction(unhandled_filters);
688        let updated_node = if new_predicate.eq(&lit(true)) {
689            // FilterExec is no longer needed, but we may need to leave a projection in place
690            match self.projection().as_ref() {
691                Some(projection_indices) => {
692                    let filter_child_schema = filter_input.schema();
693                    let proj_exprs = projection_indices
694                        .iter()
695                        .map(|p| {
696                            let field = filter_child_schema.field(*p).clone();
697                            ProjectionExpr {
698                                expr: Arc::new(Column::new(field.name(), *p))
699                                    as Arc<dyn PhysicalExpr>,
700                                alias: field.name().to_string(),
701                            }
702                        })
703                        .collect::<Vec<_>>();
704                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
705                        as Arc<dyn ExecutionPlan>)
706                }
707                None => {
708                    // No projection needed, just return the input
709                    Some(filter_input)
710                }
711            }
712        } else if new_predicate.eq(&self.predicate) {
713            // The new predicate is the same as our current predicate
714            None
715        } else {
716            // Create a new FilterExec with the new predicate, preserving the projection
717            let new = FilterExec {
718                predicate: Arc::clone(&new_predicate),
719                input: Arc::clone(&filter_input),
720                metrics: self.metrics.clone(),
721                default_selectivity: self.default_selectivity,
722                cache: Arc::new(Self::compute_properties(
723                    &filter_input,
724                    &new_predicate,
725                    self.default_selectivity,
726                    self.projection.as_deref(),
727                )?),
728                projection: self.projection.clone(),
729                batch_size: self.batch_size,
730                fetch: self.fetch,
731            };
732            Some(Arc::new(new) as _)
733        };
734
735        Ok(FilterPushdownPropagation {
736            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
737            updated_node,
738        })
739    }
740
741    fn fetch(&self) -> Option<usize> {
742        self.fetch
743    }
744
745    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
746        Some(Arc::new(Self {
747            predicate: Arc::clone(&self.predicate),
748            input: Arc::clone(&self.input),
749            metrics: self.metrics.clone(),
750            default_selectivity: self.default_selectivity,
751            cache: Arc::clone(&self.cache),
752            projection: self.projection.clone(),
753            batch_size: self.batch_size,
754            fetch,
755        }))
756    }
757
758    fn with_preserve_order(
759        &self,
760        preserve_order: bool,
761    ) -> Option<Arc<dyn ExecutionPlan>> {
762        self.input
763            .with_preserve_order(preserve_order)
764            .and_then(|new_input| {
765                Arc::new(self.clone())
766                    .with_new_children(vec![new_input])
767                    .ok()
768            })
769    }
770}
771
772impl EmbeddedProjection for FilterExec {
773    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
774        FilterExecBuilder::from(self)
775            .apply_projection(projection)?
776            .build()
777    }
778}
779
780/// Converts an interval bound to a [`Precision`] value. NULL bounds (which
781/// represent "unbounded" in the interval type) map to [`Precision::Absent`].
782fn interval_bound_to_precision(
783    bound: ScalarValue,
784    is_exact: bool,
785) -> Precision<ScalarValue> {
786    if bound.is_null() {
787        Precision::Absent
788    } else if is_exact {
789        Precision::Exact(bound)
790    } else {
791        Precision::Inexact(bound)
792    }
793}
794
795/// This function ensures that all bounds in the `ExprBoundaries` vector are
796/// converted to closed bounds. If a lower/upper bound is initially open, it
797/// is adjusted by using the next/previous value for its data type to convert
798/// it into a closed bound.
799fn collect_new_statistics(
800    schema: &SchemaRef,
801    input_column_stats: &[ColumnStatistics],
802    analysis_boundaries: Vec<ExprBoundaries>,
803) -> Vec<ColumnStatistics> {
804    analysis_boundaries
805        .into_iter()
806        .enumerate()
807        .map(
808            |(
809                idx,
810                ExprBoundaries {
811                    interval,
812                    distinct_count,
813                    ..
814                },
815            )| {
816                let Some(interval) = interval else {
817                    // If the interval is `None`, we can say that there are no rows.
818                    // Use a typed null to preserve the column's data type, so that
819                    // downstream interval analysis can still intersect intervals
820                    // of the same type.
821                    let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
822                        .unwrap_or(ScalarValue::Null);
823                    return ColumnStatistics {
824                        null_count: Precision::Exact(0),
825                        max_value: Precision::Exact(typed_null.clone()),
826                        min_value: Precision::Exact(typed_null.clone()),
827                        sum_value: Precision::Exact(typed_null),
828                        distinct_count: Precision::Exact(0),
829                        byte_size: input_column_stats[idx].byte_size,
830                    };
831                };
832                let (lower, upper) = interval.into_bounds();
833                let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
834                let min_value = interval_bound_to_precision(lower, is_exact);
835                let max_value = interval_bound_to_precision(upper, is_exact);
836                ColumnStatistics {
837                    null_count: input_column_stats[idx].null_count.to_inexact(),
838                    max_value,
839                    min_value,
840                    sum_value: Precision::Absent,
841                    distinct_count: distinct_count.to_inexact(),
842                    byte_size: input_column_stats[idx].byte_size,
843                }
844            },
845        )
846        .collect()
847}
848
849/// The FilterExec streams wraps the input iterator and applies the predicate expression to
850/// determine which rows to include in its output batches
851struct FilterExecStream {
852    /// Output schema after the projection
853    schema: SchemaRef,
854    /// The expression to filter on. This expression must evaluate to a boolean value.
855    predicate: Arc<dyn PhysicalExpr>,
856    /// The input partition to filter.
857    input: SendableRecordBatchStream,
858    /// Runtime metrics recording
859    metrics: FilterExecMetrics,
860    /// The projection indices of the columns in the input schema
861    projection: Option<ProjectionRef>,
862    /// Batch coalescer to combine small batches
863    batch_coalescer: LimitedBatchCoalescer,
864}
865
866/// The metrics for `FilterExec`
867struct FilterExecMetrics {
868    /// Common metrics for most operators
869    baseline_metrics: BaselineMetrics,
870    /// Selectivity of the filter, calculated as output_rows / input_rows
871    selectivity: RatioMetrics,
872    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
873    // or modifying metrics comments
874}
875
876impl FilterExecMetrics {
877    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
878        Self {
879            baseline_metrics: BaselineMetrics::new(metrics, partition),
880            selectivity: MetricBuilder::new(metrics)
881                .with_type(MetricType::SUMMARY)
882                .ratio_metrics("selectivity", partition),
883        }
884    }
885}
886
887pub fn batch_filter(
888    batch: &RecordBatch,
889    predicate: &Arc<dyn PhysicalExpr>,
890) -> Result<RecordBatch> {
891    filter_and_project(batch, predicate, None)
892}
893
894fn filter_and_project(
895    batch: &RecordBatch,
896    predicate: &Arc<dyn PhysicalExpr>,
897    projection: Option<&Vec<usize>>,
898) -> Result<RecordBatch> {
899    predicate
900        .evaluate(batch)
901        .and_then(|v| v.into_array(batch.num_rows()))
902        .and_then(|array| {
903            Ok(match (as_boolean_array(&array), projection) {
904                // Apply filter array to record batch
905                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
906                (Ok(filter_array), Some(projection)) => {
907                    let projected_batch = batch.project(projection)?;
908                    filter_record_batch(&projected_batch, filter_array)?
909                }
910                (Err(_), _) => {
911                    return internal_err!(
912                        "Cannot create filter_array from non-boolean predicates"
913                    );
914                }
915            })
916        })
917}
918
919impl Stream for FilterExecStream {
920    type Item = Result<RecordBatch>;
921
922    fn poll_next(
923        mut self: Pin<&mut Self>,
924        cx: &mut Context<'_>,
925    ) -> Poll<Option<Self::Item>> {
926        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
927        loop {
928            // If there is a completed batch ready, return it
929            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
930                self.metrics.selectivity.add_part(batch.num_rows());
931                let poll = Poll::Ready(Some(Ok(batch)));
932                return self.metrics.baseline_metrics.record_poll(poll);
933            }
934
935            if self.batch_coalescer.is_finished() {
936                // If input is done and no batches are ready, return None to signal end of stream.
937                return Poll::Ready(None);
938            }
939
940            // Attempt to pull the next batch from the input stream.
941            match ready!(self.input.poll_next_unpin(cx)) {
942                None => {
943                    self.batch_coalescer.finish()?;
944                    // continue draining the coalescer
945                }
946                Some(Ok(batch)) => {
947                    let timer = elapsed_compute.timer();
948                    let status = self.predicate.as_ref()
949                        .evaluate(&batch)
950                        .and_then(|v| v.into_array(batch.num_rows()))
951                        .and_then(|array| {
952                            Ok(match self.projection.as_ref()  {
953                                Some(projection) => {
954                                    let projected_batch = batch.project(projection)?;
955                                    (array, projected_batch)
956                                },
957                                None => (array, batch)
958                            })
959                        }).and_then(|(array, batch)| {
960                            match as_boolean_array(&array) {
961                                Ok(filter_array) => {
962                                    self.metrics.selectivity.add_total(batch.num_rows());
963                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
964                                    let batch = filter_record_batch(&batch, filter_array)?;
965                                    let state = self.batch_coalescer.push_batch(batch)?;
966                                    Ok(state)
967                                }
968                                Err(_) => {
969                                    internal_err!(
970                                        "Cannot create filter_array from non-boolean predicates"
971                                    )
972                                }
973                            }
974                        })?;
975                    timer.done();
976
977                    match status {
978                        PushBatchStatus::Continue => {
979                            // Keep pushing more batches
980                        }
981                        PushBatchStatus::LimitReached => {
982                            // limit was reached, so stop early
983                            self.batch_coalescer.finish()?;
984                            // continue draining the coalescer
985                        }
986                    }
987                }
988
989                // Error case
990                other => return Poll::Ready(other),
991            }
992        }
993    }
994
995    fn size_hint(&self) -> (usize, Option<usize>) {
996        // Same number of record batches
997        self.input.size_hint()
998    }
999}
1000impl RecordBatchStream for FilterExecStream {
1001    fn schema(&self) -> SchemaRef {
1002        Arc::clone(&self.schema)
1003    }
1004}
1005
1006/// Return the equals Column-Pairs and Non-equals Column-Pairs
1007#[deprecated(
1008    since = "51.0.0",
1009    note = "This function will be internal in the future"
1010)]
1011pub fn collect_columns_from_predicate(
1012    predicate: &'_ Arc<dyn PhysicalExpr>,
1013) -> EqualAndNonEqual<'_> {
1014    collect_columns_from_predicate_inner(predicate)
1015}
1016
1017fn collect_columns_from_predicate_inner(
1018    predicate: &'_ Arc<dyn PhysicalExpr>,
1019) -> EqualAndNonEqual<'_> {
1020    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1021    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1022
1023    let predicates = split_conjunction(predicate);
1024    predicates.into_iter().for_each(|p| {
1025        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
1026            // Only extract pairs where at least one side is a Column reference.
1027            // Pairs like `complex_expr = literal` should not create equivalence
1028            // classes — the literal could appear in many unrelated expressions
1029            // (e.g. sort keys), and normalize_expr's deep traversal would
1030            // replace those occurrences with the complex expression, corrupting
1031            // sort orderings. Constant propagation for such pairs is handled
1032            // separately by `extend_constants`.
1033            let has_direct_column_operand =
1034                binary.left().as_any().downcast_ref::<Column>().is_some()
1035                    || binary.right().as_any().downcast_ref::<Column>().is_some();
1036            if !has_direct_column_operand {
1037                return;
1038            }
1039            match binary.op() {
1040                Operator::Eq => {
1041                    eq_predicate_columns.push((binary.left(), binary.right()))
1042                }
1043                Operator::NotEq => {
1044                    ne_predicate_columns.push((binary.left(), binary.right()))
1045                }
1046                _ => {}
1047            }
1048        }
1049    });
1050
1051    (eq_predicate_columns, ne_predicate_columns)
1052}
1053
1054/// Pair of `Arc<dyn PhysicalExpr>`s
1055pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
1056
1057/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
1058pub type EqualAndNonEqual<'a> =
1059    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
1060
1061#[cfg(test)]
1062mod tests {
1063    use super::*;
1064    use crate::empty::EmptyExec;
1065    use crate::expressions::*;
1066    use crate::test;
1067    use crate::test::exec::StatisticsExec;
1068    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
1069    use datafusion_common::ScalarValue;
1070
1071    #[tokio::test]
1072    async fn collect_columns_predicates() -> Result<()> {
1073        let schema = test::aggr_test_schema();
1074        let predicate: Arc<dyn PhysicalExpr> = binary(
1075            binary(
1076                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
1077                Operator::And,
1078                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
1079                &schema,
1080            )?,
1081            Operator::And,
1082            binary(
1083                binary(
1084                    col("c2", &schema)?,
1085                    Operator::Eq,
1086                    col("c9", &schema)?,
1087                    &schema,
1088                )?,
1089                Operator::And,
1090                binary(
1091                    col("c1", &schema)?,
1092                    Operator::NotEq,
1093                    col("c13", &schema)?,
1094                    &schema,
1095                )?,
1096                &schema,
1097            )?,
1098            &schema,
1099        )?;
1100
1101        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
1102        assert_eq!(2, equal_pairs.len());
1103        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
1104        assert!(equal_pairs[0].1.eq(&lit(4u32)));
1105
1106        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
1107        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
1108
1109        assert_eq!(1, ne_pairs.len());
1110        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
1111        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
1112
1113        Ok(())
1114    }
1115
1116    #[tokio::test]
1117    async fn test_filter_statistics_basic_expr() -> Result<()> {
1118        // Table:
1119        //      a: min=1, max=100
1120        let bytes_per_row = 4;
1121        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1122        let input = Arc::new(StatisticsExec::new(
1123            Statistics {
1124                num_rows: Precision::Inexact(100),
1125                total_byte_size: Precision::Inexact(100 * bytes_per_row),
1126                column_statistics: vec![ColumnStatistics {
1127                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1128                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1129                    ..Default::default()
1130                }],
1131            },
1132            schema.clone(),
1133        ));
1134
1135        // a <= 25
1136        let predicate: Arc<dyn PhysicalExpr> =
1137            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1138
1139        // WHERE a <= 25
1140        let filter: Arc<dyn ExecutionPlan> =
1141            Arc::new(FilterExec::try_new(predicate, input)?);
1142
1143        let statistics = filter.partition_statistics(None)?;
1144        assert_eq!(statistics.num_rows, Precision::Inexact(25));
1145        assert_eq!(
1146            statistics.total_byte_size,
1147            Precision::Inexact(25 * bytes_per_row)
1148        );
1149        assert_eq!(
1150            statistics.column_statistics,
1151            vec![ColumnStatistics {
1152                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1153                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1154                ..Default::default()
1155            }]
1156        );
1157
1158        Ok(())
1159    }
1160
1161    #[tokio::test]
1162    async fn test_filter_statistics_column_level_nested() -> Result<()> {
1163        // Table:
1164        //      a: min=1, max=100
1165        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1166        let input = Arc::new(StatisticsExec::new(
1167            Statistics {
1168                num_rows: Precision::Inexact(100),
1169                column_statistics: vec![ColumnStatistics {
1170                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1171                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1172                    ..Default::default()
1173                }],
1174                total_byte_size: Precision::Absent,
1175            },
1176            schema.clone(),
1177        ));
1178
1179        // WHERE a <= 25
1180        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1181            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1182            input,
1183        )?);
1184
1185        // Nested filters (two separate physical plans, instead of AND chain in the expr)
1186        // WHERE a >= 10
1187        // WHERE a <= 25
1188        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1189            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1190            sub_filter,
1191        )?);
1192
1193        let statistics = filter.partition_statistics(None)?;
1194        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1195        assert_eq!(
1196            statistics.column_statistics,
1197            vec![ColumnStatistics {
1198                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1199                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1200                ..Default::default()
1201            }]
1202        );
1203
1204        Ok(())
1205    }
1206
1207    #[tokio::test]
1208    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1209        // Table:
1210        //      a: min=1, max=100
1211        //      b: min=1, max=50
1212        let schema = Schema::new(vec![
1213            Field::new("a", DataType::Int32, false),
1214            Field::new("b", DataType::Int32, false),
1215        ]);
1216        let input = Arc::new(StatisticsExec::new(
1217            Statistics {
1218                num_rows: Precision::Inexact(100),
1219                column_statistics: vec![
1220                    ColumnStatistics {
1221                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1222                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1223                        ..Default::default()
1224                    },
1225                    ColumnStatistics {
1226                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1227                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1228                        ..Default::default()
1229                    },
1230                ],
1231                total_byte_size: Precision::Absent,
1232            },
1233            schema.clone(),
1234        ));
1235
1236        // WHERE a <= 25
1237        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1238            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1239            input,
1240        )?);
1241
1242        // WHERE b > 45
1243        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1244            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1245            a_lte_25,
1246        )?);
1247
1248        // WHERE a >= 10
1249        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1250            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1251            b_gt_5,
1252        )?);
1253        let statistics = filter.partition_statistics(None)?;
1254        // On a uniform distribution, only fifteen rows will satisfy the
1255        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1256        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1257        //
1258        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1259        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1260        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1261        assert_eq!(
1262            statistics.column_statistics,
1263            vec![
1264                ColumnStatistics {
1265                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1266                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1267                    ..Default::default()
1268                },
1269                ColumnStatistics {
1270                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1271                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1272                    ..Default::default()
1273                }
1274            ]
1275        );
1276
1277        Ok(())
1278    }
1279
1280    #[tokio::test]
1281    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1282        // Table:
1283        //      a: min=???, max=??? (missing)
1284        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1285        let input = Arc::new(StatisticsExec::new(
1286            Statistics::new_unknown(&schema),
1287            schema.clone(),
1288        ));
1289
1290        // a <= 25
1291        let predicate: Arc<dyn PhysicalExpr> =
1292            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1293
1294        // WHERE a <= 25
1295        let filter: Arc<dyn ExecutionPlan> =
1296            Arc::new(FilterExec::try_new(predicate, input)?);
1297
1298        let statistics = filter.partition_statistics(None)?;
1299        assert_eq!(statistics.num_rows, Precision::Absent);
1300
1301        Ok(())
1302    }
1303
1304    #[tokio::test]
1305    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1306        // Table:
1307        //      a: min=1, max=100
1308        //      b: min=1, max=3
1309        //      c: min=1000.0  max=1100.0
1310        let schema = Schema::new(vec![
1311            Field::new("a", DataType::Int32, false),
1312            Field::new("b", DataType::Int32, false),
1313            Field::new("c", DataType::Float32, false),
1314        ]);
1315        let input = Arc::new(StatisticsExec::new(
1316            Statistics {
1317                num_rows: Precision::Inexact(1000),
1318                total_byte_size: Precision::Inexact(4000),
1319                column_statistics: vec![
1320                    ColumnStatistics {
1321                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1322                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1323                        ..Default::default()
1324                    },
1325                    ColumnStatistics {
1326                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1327                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1328                        ..Default::default()
1329                    },
1330                    ColumnStatistics {
1331                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1332                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1333                        ..Default::default()
1334                    },
1335                ],
1336            },
1337            schema,
1338        ));
1339        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1340        let predicate = Arc::new(BinaryExpr::new(
1341            Arc::new(BinaryExpr::new(
1342                Arc::new(Column::new("a", 0)),
1343                Operator::LtEq,
1344                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1345            )),
1346            Operator::And,
1347            Arc::new(BinaryExpr::new(
1348                Arc::new(BinaryExpr::new(
1349                    Arc::new(Column::new("b", 1)),
1350                    Operator::Eq,
1351                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1352                )),
1353                Operator::And,
1354                Arc::new(BinaryExpr::new(
1355                    Arc::new(BinaryExpr::new(
1356                        Arc::new(Column::new("c", 2)),
1357                        Operator::LtEq,
1358                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1359                    )),
1360                    Operator::And,
1361                    Arc::new(BinaryExpr::new(
1362                        Arc::new(Column::new("a", 0)),
1363                        Operator::Gt,
1364                        Arc::new(Column::new("b", 1)),
1365                    )),
1366                )),
1367            )),
1368        ));
1369        let filter: Arc<dyn ExecutionPlan> =
1370            Arc::new(FilterExec::try_new(predicate, input)?);
1371        let statistics = filter.partition_statistics(None)?;
1372        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1373        // num_rows after ceil => 133.0... => 134
1374        // total_byte_size after ceil => 532.0... => 533
1375        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1376        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1377        let exp_col_stats = vec![
1378            ColumnStatistics {
1379                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1380                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1381                ..Default::default()
1382            },
1383            ColumnStatistics {
1384                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1385                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1386                ..Default::default()
1387            },
1388            ColumnStatistics {
1389                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1390                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1391                ..Default::default()
1392            },
1393        ];
1394        let _ = exp_col_stats
1395            .into_iter()
1396            .zip(statistics.column_statistics)
1397            .map(|(expected, actual)| {
1398                if let Some(val) = actual.min_value.get_value() {
1399                    if val.data_type().is_floating() {
1400                        // Windows rounds arithmetic operation results differently for floating point numbers.
1401                        // Therefore, we check if the actual values are in an epsilon range.
1402                        let actual_min = actual.min_value.get_value().unwrap();
1403                        let actual_max = actual.max_value.get_value().unwrap();
1404                        let expected_min = expected.min_value.get_value().unwrap();
1405                        let expected_max = expected.max_value.get_value().unwrap();
1406                        let eps = ScalarValue::Float32(Some(1e-6));
1407
1408                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1409                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1410
1411                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1412                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1413                    } else {
1414                        assert_eq!(actual, expected);
1415                    }
1416                } else {
1417                    assert_eq!(actual, expected);
1418                }
1419            });
1420
1421        Ok(())
1422    }
1423
1424    #[tokio::test]
1425    async fn test_filter_statistics_full_selective() -> Result<()> {
1426        // Table:
1427        //      a: min=1, max=100
1428        //      b: min=1, max=3
1429        let schema = Schema::new(vec![
1430            Field::new("a", DataType::Int32, false),
1431            Field::new("b", DataType::Int32, false),
1432        ]);
1433        let input = Arc::new(StatisticsExec::new(
1434            Statistics {
1435                num_rows: Precision::Inexact(1000),
1436                total_byte_size: Precision::Inexact(4000),
1437                column_statistics: vec![
1438                    ColumnStatistics {
1439                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1440                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1441                        ..Default::default()
1442                    },
1443                    ColumnStatistics {
1444                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1445                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1446                        ..Default::default()
1447                    },
1448                ],
1449            },
1450            schema,
1451        ));
1452        // WHERE a<200 AND 1<=b
1453        let predicate = Arc::new(BinaryExpr::new(
1454            Arc::new(BinaryExpr::new(
1455                Arc::new(Column::new("a", 0)),
1456                Operator::Lt,
1457                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1458            )),
1459            Operator::And,
1460            Arc::new(BinaryExpr::new(
1461                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1462                Operator::LtEq,
1463                Arc::new(Column::new("b", 1)),
1464            )),
1465        ));
1466        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1467        let expected = input.partition_statistics(None)?.column_statistics;
1468        let filter: Arc<dyn ExecutionPlan> =
1469            Arc::new(FilterExec::try_new(predicate, input)?);
1470        let statistics = filter.partition_statistics(None)?;
1471
1472        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1473        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1474        assert_eq!(statistics.column_statistics, expected);
1475
1476        Ok(())
1477    }
1478
1479    #[tokio::test]
1480    async fn test_filter_statistics_zero_selective() -> Result<()> {
1481        // Table:
1482        //      a: min=1, max=100
1483        //      b: min=1, max=3
1484        let schema = Schema::new(vec![
1485            Field::new("a", DataType::Int32, false),
1486            Field::new("b", DataType::Int32, false),
1487        ]);
1488        let input = Arc::new(StatisticsExec::new(
1489            Statistics {
1490                num_rows: Precision::Inexact(1000),
1491                total_byte_size: Precision::Inexact(4000),
1492                column_statistics: vec![
1493                    ColumnStatistics {
1494                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1495                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1496                        ..Default::default()
1497                    },
1498                    ColumnStatistics {
1499                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1500                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1501                        ..Default::default()
1502                    },
1503                ],
1504            },
1505            schema,
1506        ));
1507        // WHERE a>200 AND 1<=b
1508        let predicate = Arc::new(BinaryExpr::new(
1509            Arc::new(BinaryExpr::new(
1510                Arc::new(Column::new("a", 0)),
1511                Operator::Gt,
1512                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1513            )),
1514            Operator::And,
1515            Arc::new(BinaryExpr::new(
1516                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1517                Operator::LtEq,
1518                Arc::new(Column::new("b", 1)),
1519            )),
1520        ));
1521        let filter: Arc<dyn ExecutionPlan> =
1522            Arc::new(FilterExec::try_new(predicate, input)?);
1523        let statistics = filter.partition_statistics(None)?;
1524
1525        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1526        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1527        assert_eq!(
1528            statistics.column_statistics,
1529            vec![
1530                ColumnStatistics {
1531                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1532                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1533                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1534                    distinct_count: Precision::Exact(0),
1535                    null_count: Precision::Exact(0),
1536                    byte_size: Precision::Absent,
1537                },
1538                ColumnStatistics {
1539                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1540                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1541                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1542                    distinct_count: Precision::Exact(0),
1543                    null_count: Precision::Exact(0),
1544                    byte_size: Precision::Absent,
1545                },
1546            ]
1547        );
1548
1549        Ok(())
1550    }
1551
1552    /// Regression test: stacking two FilterExecs where the inner filter
1553    /// proves zero selectivity should not panic with a type mismatch
1554    /// during interval intersection.
1555    ///
1556    /// Previously, when a filter proved no rows could match, the column
1557    /// statistics used untyped `ScalarValue::Null` (data type `Null`).
1558    /// If an outer FilterExec then tried to analyze its own predicate
1559    /// against those statistics, `Interval::intersect` would fail with:
1560    ///   "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1561    #[tokio::test]
1562    async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1563        // Inner table: a: [1, 100], b: [1, 3]
1564        let schema = Schema::new(vec![
1565            Field::new("a", DataType::Int32, false),
1566            Field::new("b", DataType::Int32, false),
1567        ]);
1568        let input = Arc::new(StatisticsExec::new(
1569            Statistics {
1570                num_rows: Precision::Inexact(1000),
1571                total_byte_size: Precision::Inexact(4000),
1572                column_statistics: vec![
1573                    ColumnStatistics {
1574                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1575                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1576                        ..Default::default()
1577                    },
1578                    ColumnStatistics {
1579                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1580                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1581                        ..Default::default()
1582                    },
1583                ],
1584            },
1585            schema,
1586        ));
1587
1588        // Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1589        let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1590            Arc::new(Column::new("a", 0)),
1591            Operator::Gt,
1592            Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1593        ));
1594        let inner_filter: Arc<dyn ExecutionPlan> =
1595            Arc::new(FilterExec::try_new(inner_predicate, input)?);
1596
1597        // Outer filter: a = 50
1598        // Before the fix, this would panic because the inner filter's
1599        // zero-selectivity statistics produced Null-typed intervals for
1600        // column `a`, which couldn't intersect with the Int32 literal.
1601        let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1602            Arc::new(Column::new("a", 0)),
1603            Operator::Eq,
1604            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1605        ));
1606        let outer_filter: Arc<dyn ExecutionPlan> =
1607            Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1608
1609        // Should succeed without error
1610        let statistics = outer_filter.partition_statistics(None)?;
1611        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1612
1613        Ok(())
1614    }
1615
1616    #[tokio::test]
1617    async fn test_filter_statistics_more_inputs() -> Result<()> {
1618        let schema = Schema::new(vec![
1619            Field::new("a", DataType::Int32, false),
1620            Field::new("b", DataType::Int32, false),
1621        ]);
1622        let input = Arc::new(StatisticsExec::new(
1623            Statistics {
1624                num_rows: Precision::Inexact(1000),
1625                total_byte_size: Precision::Inexact(4000),
1626                column_statistics: vec![
1627                    ColumnStatistics {
1628                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1629                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1630                        ..Default::default()
1631                    },
1632                    ColumnStatistics {
1633                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1634                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1635                        ..Default::default()
1636                    },
1637                ],
1638            },
1639            schema,
1640        ));
1641        // WHERE a<50
1642        let predicate = Arc::new(BinaryExpr::new(
1643            Arc::new(Column::new("a", 0)),
1644            Operator::Lt,
1645            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1646        ));
1647        let filter: Arc<dyn ExecutionPlan> =
1648            Arc::new(FilterExec::try_new(predicate, input)?);
1649        let statistics = filter.partition_statistics(None)?;
1650
1651        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1652        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1653        assert_eq!(
1654            statistics.column_statistics,
1655            vec![
1656                ColumnStatistics {
1657                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1658                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1659                    ..Default::default()
1660                },
1661                ColumnStatistics {
1662                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1663                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1664                    ..Default::default()
1665                },
1666            ]
1667        );
1668
1669        Ok(())
1670    }
1671
1672    #[tokio::test]
1673    async fn test_empty_input_statistics() -> Result<()> {
1674        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1675        let input = Arc::new(StatisticsExec::new(
1676            Statistics::new_unknown(&schema),
1677            schema,
1678        ));
1679        // WHERE a <= 10 AND 0 <= a - 5
1680        let predicate = Arc::new(BinaryExpr::new(
1681            Arc::new(BinaryExpr::new(
1682                Arc::new(Column::new("a", 0)),
1683                Operator::LtEq,
1684                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1685            )),
1686            Operator::And,
1687            Arc::new(BinaryExpr::new(
1688                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1689                Operator::LtEq,
1690                Arc::new(BinaryExpr::new(
1691                    Arc::new(Column::new("a", 0)),
1692                    Operator::Minus,
1693                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1694                )),
1695            )),
1696        ));
1697        let filter: Arc<dyn ExecutionPlan> =
1698            Arc::new(FilterExec::try_new(predicate, input)?);
1699        let filter_statistics = filter.partition_statistics(None)?;
1700
1701        let expected_filter_statistics = Statistics {
1702            num_rows: Precision::Absent,
1703            total_byte_size: Precision::Absent,
1704            column_statistics: vec![ColumnStatistics {
1705                null_count: Precision::Absent,
1706                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1707                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1708                sum_value: Precision::Absent,
1709                distinct_count: Precision::Absent,
1710                byte_size: Precision::Absent,
1711            }],
1712        };
1713
1714        assert_eq!(filter_statistics, expected_filter_statistics);
1715
1716        Ok(())
1717    }
1718
1719    #[tokio::test]
1720    async fn test_statistics_with_constant_column() -> Result<()> {
1721        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1722        let input = Arc::new(StatisticsExec::new(
1723            Statistics::new_unknown(&schema),
1724            schema,
1725        ));
1726        // WHERE a = 10
1727        let predicate = Arc::new(BinaryExpr::new(
1728            Arc::new(Column::new("a", 0)),
1729            Operator::Eq,
1730            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1731        ));
1732        let filter: Arc<dyn ExecutionPlan> =
1733            Arc::new(FilterExec::try_new(predicate, input)?);
1734        let filter_statistics = filter.partition_statistics(None)?;
1735        // First column is "a", and it is a column with only one value after the filter.
1736        assert!(filter_statistics.column_statistics[0].is_singleton());
1737
1738        Ok(())
1739    }
1740
1741    #[tokio::test]
1742    async fn test_validation_filter_selectivity() -> Result<()> {
1743        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1744        let input = Arc::new(StatisticsExec::new(
1745            Statistics::new_unknown(&schema),
1746            schema,
1747        ));
1748        // WHERE a = 10
1749        let predicate = Arc::new(BinaryExpr::new(
1750            Arc::new(Column::new("a", 0)),
1751            Operator::Eq,
1752            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1753        ));
1754        let filter = FilterExec::try_new(predicate, input)?;
1755        assert!(filter.with_default_selectivity(120).is_err());
1756        Ok(())
1757    }
1758
1759    #[tokio::test]
1760    async fn test_custom_filter_selectivity() -> Result<()> {
1761        // Need a decimal to trigger inexact selectivity
1762        let schema =
1763            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1764        let input = Arc::new(StatisticsExec::new(
1765            Statistics {
1766                num_rows: Precision::Inexact(1000),
1767                total_byte_size: Precision::Inexact(4000),
1768                column_statistics: vec![ColumnStatistics {
1769                    ..Default::default()
1770                }],
1771            },
1772            schema,
1773        ));
1774        // WHERE a = 10
1775        let predicate = Arc::new(BinaryExpr::new(
1776            Arc::new(Column::new("a", 0)),
1777            Operator::Eq,
1778            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1779        ));
1780        let filter = FilterExec::try_new(predicate, input)?;
1781        let statistics = filter.partition_statistics(None)?;
1782        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1783        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1784        let filter = filter.with_default_selectivity(40)?;
1785        let statistics = filter.partition_statistics(None)?;
1786        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1787        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1788        Ok(())
1789    }
1790
1791    #[test]
1792    fn test_equivalence_properties_union_type() -> Result<()> {
1793        let union_type = DataType::Union(
1794            UnionFields::try_new(
1795                vec![0, 1],
1796                vec![
1797                    Field::new("f1", DataType::Int32, true),
1798                    Field::new("f2", DataType::Utf8, true),
1799                ],
1800            )
1801            .unwrap(),
1802            UnionMode::Sparse,
1803        );
1804
1805        let schema = Arc::new(Schema::new(vec![
1806            Field::new("c1", DataType::Int32, true),
1807            Field::new("c2", union_type, true),
1808        ]));
1809
1810        let exec = FilterExec::try_new(
1811            binary(
1812                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1813                Operator::And,
1814                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1815                &schema,
1816            )?,
1817            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1818        )?;
1819
1820        exec.partition_statistics(None).unwrap();
1821
1822        Ok(())
1823    }
1824
1825    #[tokio::test]
1826    async fn test_builder_with_projection() -> Result<()> {
1827        // Create a schema with multiple columns
1828        let schema = Arc::new(Schema::new(vec![
1829            Field::new("a", DataType::Int32, false),
1830            Field::new("b", DataType::Int32, false),
1831            Field::new("c", DataType::Int32, false),
1832        ]));
1833
1834        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1835
1836        // Create a filter predicate: a > 10
1837        let predicate = Arc::new(BinaryExpr::new(
1838            Arc::new(Column::new("a", 0)),
1839            Operator::Gt,
1840            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1841        ));
1842
1843        // Create filter with projection [0, 2] (columns a and c) using builder
1844        let projection = Some(vec![0, 2]);
1845        let filter = FilterExecBuilder::new(predicate, input)
1846            .apply_projection(projection.clone())
1847            .unwrap()
1848            .build()?;
1849
1850        // Verify projection is set correctly
1851        assert_eq!(filter.projection(), &Some([0, 2].into()));
1852
1853        // Verify schema contains only projected columns
1854        let output_schema = filter.schema();
1855        assert_eq!(output_schema.fields().len(), 2);
1856        assert_eq!(output_schema.field(0).name(), "a");
1857        assert_eq!(output_schema.field(1).name(), "c");
1858
1859        Ok(())
1860    }
1861
1862    #[tokio::test]
1863    async fn test_builder_without_projection() -> Result<()> {
1864        let schema = Arc::new(Schema::new(vec![
1865            Field::new("a", DataType::Int32, false),
1866            Field::new("b", DataType::Int32, false),
1867        ]));
1868
1869        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1870
1871        let predicate = Arc::new(BinaryExpr::new(
1872            Arc::new(Column::new("a", 0)),
1873            Operator::Gt,
1874            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1875        ));
1876
1877        // Create filter without projection using builder
1878        let filter = FilterExecBuilder::new(predicate, input).build()?;
1879
1880        // Verify no projection is set
1881        assert!(filter.projection().is_none());
1882
1883        // Verify schema contains all columns
1884        let output_schema = filter.schema();
1885        assert_eq!(output_schema.fields().len(), 2);
1886
1887        Ok(())
1888    }
1889
1890    #[tokio::test]
1891    async fn test_builder_invalid_projection() -> Result<()> {
1892        let schema = Arc::new(Schema::new(vec![
1893            Field::new("a", DataType::Int32, false),
1894            Field::new("b", DataType::Int32, false),
1895        ]));
1896
1897        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1898
1899        let predicate = Arc::new(BinaryExpr::new(
1900            Arc::new(Column::new("a", 0)),
1901            Operator::Gt,
1902            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1903        ));
1904
1905        // Try to create filter with invalid projection (index out of bounds) using builder
1906        let result =
1907            FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5])); // 5 is out of bounds
1908
1909        // Should return an error
1910        assert!(result.is_err());
1911
1912        Ok(())
1913    }
1914
1915    #[tokio::test]
1916    async fn test_builder_vs_with_projection() -> Result<()> {
1917        // This test verifies that the builder with projection produces the same result
1918        // as try_new().with_projection(), but more efficiently (one compute_properties call)
1919        let schema = Schema::new(vec![
1920            Field::new("a", DataType::Int32, false),
1921            Field::new("b", DataType::Int32, false),
1922            Field::new("c", DataType::Int32, false),
1923            Field::new("d", DataType::Int32, false),
1924        ]);
1925
1926        let input = Arc::new(StatisticsExec::new(
1927            Statistics {
1928                num_rows: Precision::Inexact(1000),
1929                total_byte_size: Precision::Inexact(4000),
1930                column_statistics: vec![
1931                    ColumnStatistics {
1932                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1933                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1934                        ..Default::default()
1935                    },
1936                    ColumnStatistics {
1937                        ..Default::default()
1938                    },
1939                    ColumnStatistics {
1940                        ..Default::default()
1941                    },
1942                    ColumnStatistics {
1943                        ..Default::default()
1944                    },
1945                ],
1946            },
1947            schema,
1948        ));
1949        let input: Arc<dyn ExecutionPlan> = input;
1950
1951        let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1952            Arc::new(Column::new("a", 0)),
1953            Operator::Lt,
1954            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1955        ));
1956
1957        let projection = Some(vec![0, 2]);
1958
1959        // Method 1: Builder with projection (one call to compute_properties)
1960        let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1961            .apply_projection(projection.clone())
1962            .unwrap()
1963            .build()?;
1964
1965        // Method 2: Also using builder for comparison (deprecated try_new().with_projection() removed)
1966        let filter2 = FilterExecBuilder::new(predicate, input)
1967            .apply_projection(projection)
1968            .unwrap()
1969            .build()?;
1970
1971        // Both methods should produce equivalent results
1972        assert_eq!(filter1.schema(), filter2.schema());
1973        assert_eq!(filter1.projection(), filter2.projection());
1974
1975        // Verify statistics are the same
1976        let stats1 = filter1.partition_statistics(None)?;
1977        let stats2 = filter2.partition_statistics(None)?;
1978        assert_eq!(stats1.num_rows, stats2.num_rows);
1979        assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
1980
1981        Ok(())
1982    }
1983
1984    #[tokio::test]
1985    async fn test_builder_statistics_with_projection() -> Result<()> {
1986        // Test that statistics are correctly computed when using builder with projection
1987        let schema = Schema::new(vec![
1988            Field::new("a", DataType::Int32, false),
1989            Field::new("b", DataType::Int32, false),
1990            Field::new("c", DataType::Int32, false),
1991        ]);
1992
1993        let input = Arc::new(StatisticsExec::new(
1994            Statistics {
1995                num_rows: Precision::Inexact(1000),
1996                total_byte_size: Precision::Inexact(12000),
1997                column_statistics: vec![
1998                    ColumnStatistics {
1999                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2000                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2001                        ..Default::default()
2002                    },
2003                    ColumnStatistics {
2004                        min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
2005                        max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2006                        ..Default::default()
2007                    },
2008                    ColumnStatistics {
2009                        min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
2010                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2011                        ..Default::default()
2012                    },
2013                ],
2014            },
2015            schema,
2016        ));
2017
2018        // Filter: a < 50, Project: [0, 2]
2019        let predicate = Arc::new(BinaryExpr::new(
2020            Arc::new(Column::new("a", 0)),
2021            Operator::Lt,
2022            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2023        ));
2024
2025        let filter = FilterExecBuilder::new(predicate, input)
2026            .apply_projection(Some(vec![0, 2]))
2027            .unwrap()
2028            .build()?;
2029
2030        let statistics = filter.partition_statistics(None)?;
2031
2032        // Verify statistics reflect both filtering and projection
2033        assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
2034
2035        // Schema should only have 2 columns after projection
2036        assert_eq!(filter.schema().fields().len(), 2);
2037
2038        Ok(())
2039    }
2040
2041    #[test]
2042    fn test_builder_predicate_validation() -> Result<()> {
2043        // Test that builder validates predicate type correctly
2044        let schema = Arc::new(Schema::new(vec![
2045            Field::new("a", DataType::Int32, false),
2046            Field::new("b", DataType::Int32, false),
2047        ]));
2048
2049        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2050
2051        // Create a predicate that doesn't return boolean (returns Int32)
2052        let invalid_predicate = Arc::new(Column::new("a", 0));
2053
2054        // Should fail because predicate doesn't return boolean
2055        let result = FilterExecBuilder::new(invalid_predicate, input)
2056            .apply_projection(Some(vec![0]))
2057            .unwrap()
2058            .build();
2059
2060        assert!(result.is_err());
2061
2062        Ok(())
2063    }
2064
2065    #[tokio::test]
2066    async fn test_builder_projection_composition() -> Result<()> {
2067        // Test that calling apply_projection multiple times composes projections
2068        // If initial projection is [0, 2, 3] and we call apply_projection([0, 2]),
2069        // the result should be [0, 3] (indices 0 and 2 of [0, 2, 3])
2070        let schema = Arc::new(Schema::new(vec![
2071            Field::new("a", DataType::Int32, false),
2072            Field::new("b", DataType::Int32, false),
2073            Field::new("c", DataType::Int32, false),
2074            Field::new("d", DataType::Int32, false),
2075        ]));
2076
2077        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2078
2079        // Create a filter predicate: a > 10
2080        let predicate = Arc::new(BinaryExpr::new(
2081            Arc::new(Column::new("a", 0)),
2082            Operator::Gt,
2083            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2084        ));
2085
2086        // First projection: [0, 2, 3] -> select columns a, c, d
2087        // Second projection: [0, 2] -> select indices 0 and 2 of [0, 2, 3] -> [0, 3]
2088        // Final result: columns a and d
2089        let filter = FilterExecBuilder::new(predicate, input)
2090            .apply_projection(Some(vec![0, 2, 3]))?
2091            .apply_projection(Some(vec![0, 2]))?
2092            .build()?;
2093
2094        // Verify composed projection is [0, 3]
2095        assert_eq!(filter.projection(), &Some([0, 3].into()));
2096
2097        // Verify schema contains only columns a and d
2098        let output_schema = filter.schema();
2099        assert_eq!(output_schema.fields().len(), 2);
2100        assert_eq!(output_schema.field(0).name(), "a");
2101        assert_eq!(output_schema.field(1).name(), "d");
2102
2103        Ok(())
2104    }
2105
2106    #[tokio::test]
2107    async fn test_builder_projection_composition_none_clears() -> Result<()> {
2108        // Test that passing None clears the projection
2109        let schema = Arc::new(Schema::new(vec![
2110            Field::new("a", DataType::Int32, false),
2111            Field::new("b", DataType::Int32, false),
2112        ]));
2113
2114        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2115
2116        let predicate = Arc::new(BinaryExpr::new(
2117            Arc::new(Column::new("a", 0)),
2118            Operator::Gt,
2119            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2120        ));
2121
2122        // Set a projection then clear it with None
2123        let filter = FilterExecBuilder::new(predicate, input)
2124            .apply_projection(Some(vec![0]))?
2125            .apply_projection(None)?
2126            .build()?;
2127
2128        // Projection should be cleared
2129        assert_eq!(filter.projection(), &None);
2130
2131        // Schema should have all columns
2132        let output_schema = filter.schema();
2133        assert_eq!(output_schema.fields().len(), 2);
2134
2135        Ok(())
2136    }
2137
2138    #[test]
2139    fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
2140        // Test that FilterExec with a projection must remap parent dynamic
2141        // filter column indices from its output schema to the input schema
2142        // before passing them to the child.
2143        let input_schema = Arc::new(Schema::new(vec![
2144            Field::new("a", DataType::Int32, false),
2145            Field::new("b", DataType::Utf8, false),
2146            Field::new("c", DataType::Float64, false),
2147        ]));
2148        let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
2149
2150        // FilterExec: a > 0, projection=[c@2]
2151        let predicate = Arc::new(BinaryExpr::new(
2152            Arc::new(Column::new("a", 0)),
2153            Operator::Gt,
2154            Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
2155        ));
2156        let filter = FilterExecBuilder::new(predicate, input)
2157            .apply_projection(Some(vec![2]))?
2158            .build()?;
2159
2160        // Output schema should be [c:Float64]
2161        let output_schema = filter.schema();
2162        assert_eq!(output_schema.fields().len(), 1);
2163        assert_eq!(output_schema.field(0).name(), "c");
2164
2165        // Simulate a parent dynamic filter referencing output column c@0
2166        let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
2167
2168        let config = ConfigOptions::new();
2169        let desc = filter.gather_filters_for_pushdown(
2170            FilterPushdownPhase::Post,
2171            vec![parent_filter],
2172            &config,
2173        )?;
2174
2175        // The filter pushed to the child must reference c@2 (input schema),
2176        // not c@0 (output schema).
2177        let parent_filters = desc.parent_filters();
2178        assert_eq!(parent_filters.len(), 1); // one child
2179        assert_eq!(parent_filters[0].len(), 1); // one filter
2180        let remapped = &parent_filters[0][0].predicate;
2181        let display = format!("{remapped}");
2182        assert_eq!(
2183            display, "c@2",
2184            "Post-phase parent filter column index must be remapped \
2185             from output schema (c@0) to input schema (c@2)"
2186        );
2187
2188        Ok(())
2189    }
2190
2191    /// Regression test for https://github.com/apache/datafusion/issues/20194
2192    ///
2193    /// `collect_columns_from_predicate_inner` should only extract equality
2194    /// pairs where at least one side is a Column. Pairs like
2195    /// `complex_expr = literal` must not create equivalence classes because
2196    /// `normalize_expr`'s deep traversal would replace the literal inside
2197    /// unrelated expressions (e.g. sort keys) with the complex expression.
2198    #[test]
2199    fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
2200        let schema = test::aggr_test_schema();
2201
2202        // Simulate: nvl(c2, 0) = 0  →  (c2 IS DISTINCT FROM 0) = 0
2203        // Neither side is a Column, so this should NOT be extracted.
2204        let complex_expr: Arc<dyn PhysicalExpr> = binary(
2205            col("c2", &schema)?,
2206            Operator::IsDistinctFrom,
2207            lit(0u32),
2208            &schema,
2209        )?;
2210        let predicate: Arc<dyn PhysicalExpr> =
2211            binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
2212
2213        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2214        assert_eq!(
2215            0,
2216            equal_pairs.len(),
2217            "Should not extract equality pairs where neither side is a Column"
2218        );
2219
2220        // But col = literal should still be extracted
2221        let predicate: Arc<dyn PhysicalExpr> =
2222            binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
2223        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2224        assert_eq!(
2225            1,
2226            equal_pairs.len(),
2227            "Should extract equality pairs where one side is a Column"
2228        );
2229
2230        Ok(())
2231    }
2232
2233    /// Columns with Absent min/max statistics should remain Absent after
2234    /// FilterExec.
2235    #[tokio::test]
2236    async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
2237        let schema = Schema::new(vec![
2238            Field::new("a", DataType::Int32, false),
2239            Field::new("b", DataType::Int32, false),
2240        ]);
2241        let input = Arc::new(StatisticsExec::new(
2242            Statistics {
2243                num_rows: Precision::Inexact(1000),
2244                total_byte_size: Precision::Absent,
2245                column_statistics: vec![
2246                    ColumnStatistics::default(),
2247                    ColumnStatistics::default(),
2248                ],
2249            },
2250            schema.clone(),
2251        ));
2252
2253        let predicate = Arc::new(BinaryExpr::new(
2254            Arc::new(Column::new("a", 0)),
2255            Operator::Eq,
2256            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2257        ));
2258        let filter: Arc<dyn ExecutionPlan> =
2259            Arc::new(FilterExec::try_new(predicate, input)?);
2260
2261        let statistics = filter.partition_statistics(None)?;
2262        let col_b_stats = &statistics.column_statistics[1];
2263        assert_eq!(col_b_stats.min_value, Precision::Absent);
2264        assert_eq!(col_b_stats.max_value, Precision::Absent);
2265
2266        Ok(())
2267    }
2268}