Skip to main content

datafusion_physical_plan/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::hash_map::Entry;
19use std::collections::{HashMap, HashSet};
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll, ready};
23
24use datafusion_physical_expr::projection::{ProjectionRef, combine_projections};
25use itertools::Itertools;
26
27use super::{
28    ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
29    RecordBatchStream, SendableRecordBatchStream, Statistics,
30};
31use crate::check_if_same_properties;
32use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
33use crate::common::can_project;
34use crate::execution_plan::CardinalityEffect;
35use crate::filter_pushdown::{
36    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
37    FilterPushdownPropagation, PushedDown,
38};
39use crate::limit::LocalLimitExec;
40use crate::metrics::{MetricBuilder, MetricType};
41use crate::projection::{
42    EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
43    try_embed_projection, update_expr,
44};
45use crate::stream::EmptyRecordBatchStream;
46use crate::{
47    DisplayFormatType, ExecutionPlan,
48    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
49};
50
51use arrow::compute::filter_record_batch;
52use arrow::datatypes::{DataType, SchemaRef};
53use arrow::record_batch::RecordBatch;
54use datafusion_common::cast::as_boolean_array;
55use datafusion_common::config::ConfigOptions;
56use datafusion_common::stats::Precision;
57use datafusion_common::{
58    DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema,
59};
60use datafusion_execution::TaskContext;
61use datafusion_expr::Operator;
62use datafusion_physical_expr::equivalence::ProjectionMapping;
63use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
64use datafusion_physical_expr::intervals::utils::check_support;
65use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
66use datafusion_physical_expr::{
67    AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
68    conjunction, split_conjunction,
69};
70
71use datafusion_physical_expr_common::physical_expr::fmt_sql;
72use futures::stream::{Stream, StreamExt};
73use log::trace;
74
75const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
76const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192;
77
78/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
79/// include in its output batches.
80#[derive(Debug, Clone)]
81pub struct FilterExec {
82    /// The expression to filter on. This expression must evaluate to a boolean value.
83    predicate: Arc<dyn PhysicalExpr>,
84    /// The input plan
85    input: Arc<dyn ExecutionPlan>,
86    /// Execution metrics
87    metrics: ExecutionPlanMetricsSet,
88    /// Selectivity for statistics. 0 = no rows, 100 = all rows
89    default_selectivity: u8,
90    /// Properties equivalence properties, partitioning, etc.
91    cache: Arc<PlanProperties>,
92    /// The projection indices of the columns in the output schema of join
93    projection: Option<ProjectionRef>,
94    /// Target batch size for output batches
95    batch_size: usize,
96    /// Number of rows to fetch
97    fetch: Option<usize>,
98}
99
100/// Builder for [`FilterExec`] to set optional parameters
101pub struct FilterExecBuilder {
102    predicate: Arc<dyn PhysicalExpr>,
103    input: Arc<dyn ExecutionPlan>,
104    projection: Option<ProjectionRef>,
105    default_selectivity: u8,
106    batch_size: usize,
107    fetch: Option<usize>,
108}
109
110impl FilterExecBuilder {
111    /// Create a new builder with required parameters (predicate and input)
112    pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
113        Self {
114            predicate,
115            input,
116            projection: None,
117            default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
118            batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
119            fetch: None,
120        }
121    }
122
123    /// Set the input execution plan
124    pub fn with_input(mut self, input: Arc<dyn ExecutionPlan>) -> Self {
125        self.input = input;
126        self
127    }
128
129    /// Set the predicate expression
130    pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
131        self.predicate = predicate;
132        self
133    }
134
135    /// Set the projection, composing with any existing projection.
136    ///
137    /// If a projection is already set, the new projection indices are mapped
138    /// through the existing projection. For example, if the current projection
139    /// is `[0, 2, 3]` and `apply_projection(Some(vec![0, 2]))` is called, the
140    /// resulting projection will be `[0, 3]` (indices 0 and 2 of `[0, 2, 3]`).
141    ///
142    /// If no projection is currently set, the new projection is used directly.
143    /// If `None` is passed, the projection is cleared.
144    pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> {
145        let projection = projection.map(Into::into);
146        self.apply_projection_by_ref(projection.as_ref())
147    }
148
149    /// The same as [`Self::apply_projection`] but takes projection shared reference.
150    pub fn apply_projection_by_ref(
151        mut self,
152        projection: Option<&ProjectionRef>,
153    ) -> Result<Self> {
154        // Check if the projection is valid against current output schema
155        can_project(&self.input.schema(), projection.map(AsRef::as_ref))?;
156        self.projection = combine_projections(projection, self.projection.as_ref())?;
157        Ok(self)
158    }
159
160    /// Set the default selectivity
161    pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
162        self.default_selectivity = default_selectivity;
163        self
164    }
165
166    /// Set the batch size
167    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
168        self.batch_size = batch_size;
169        self
170    }
171
172    /// Set the fetch limit
173    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
174        self.fetch = fetch;
175        self
176    }
177
178    /// Build the FilterExec, computing properties once with all configured parameters
179    pub fn build(self) -> Result<FilterExec> {
180        // Validate predicate type
181        match self.predicate.data_type(self.input.schema().as_ref())? {
182            DataType::Boolean => {}
183            other => {
184                return plan_err!(
185                    "Filter predicate must return BOOLEAN values, got {other:?}"
186                );
187            }
188        }
189
190        // Validate selectivity
191        if self.default_selectivity > 100 {
192            return plan_err!(
193                "Default filter selectivity value needs to be less than or equal to 100"
194            );
195        }
196
197        // Validate projection if provided
198        can_project(&self.input.schema(), self.projection.as_deref())?;
199
200        // Compute properties once with all parameters
201        let cache = FilterExec::compute_properties(
202            &self.input,
203            &self.predicate,
204            self.default_selectivity,
205            self.projection.as_deref(),
206        )?;
207
208        Ok(FilterExec {
209            predicate: self.predicate,
210            input: self.input,
211            metrics: ExecutionPlanMetricsSet::new(),
212            default_selectivity: self.default_selectivity,
213            cache: Arc::new(cache),
214            projection: self.projection,
215            batch_size: self.batch_size,
216            fetch: self.fetch,
217        })
218    }
219}
220
221impl From<&FilterExec> for FilterExecBuilder {
222    fn from(exec: &FilterExec) -> Self {
223        Self {
224            predicate: Arc::clone(&exec.predicate),
225            input: Arc::clone(&exec.input),
226            projection: exec.projection.clone(),
227            default_selectivity: exec.default_selectivity,
228            batch_size: exec.batch_size,
229            fetch: exec.fetch,
230            // We could cache / copy over PlanProperties
231            // here but that would require invalidating them in FilterExecBuilder::apply_projection, etc.
232            // and currently every call to this method ends up invalidating them anyway.
233            // If useful this can be added in the future as a non-breaking change.
234        }
235    }
236}
237
238impl FilterExec {
239    /// Create a FilterExec on an input using the builder pattern
240    pub fn try_new(
241        predicate: Arc<dyn PhysicalExpr>,
242        input: Arc<dyn ExecutionPlan>,
243    ) -> Result<Self> {
244        FilterExecBuilder::new(predicate, input).build()
245    }
246
247    /// Get a batch size
248    pub fn batch_size(&self) -> usize {
249        self.batch_size
250    }
251
252    /// Set the default selectivity
253    pub fn with_default_selectivity(
254        mut self,
255        default_selectivity: u8,
256    ) -> Result<Self, DataFusionError> {
257        if default_selectivity > 100 {
258            return plan_err!(
259                "Default filter selectivity value needs to be less than or equal to 100"
260            );
261        }
262        self.default_selectivity = default_selectivity;
263        Ok(self)
264    }
265
266    /// Return new instance of [FilterExec] with the given projection.
267    ///
268    /// # Deprecated
269    /// Use [`FilterExecBuilder::apply_projection`] instead
270    #[deprecated(
271        since = "52.0.0",
272        note = "Use FilterExecBuilder::apply_projection instead"
273    )]
274    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
275        let builder = FilterExecBuilder::from(self);
276        builder.apply_projection(projection)?.build()
277    }
278
279    /// Set the batch size
280    pub fn with_batch_size(&self, batch_size: usize) -> Result<Self> {
281        Ok(Self {
282            predicate: Arc::clone(&self.predicate),
283            input: Arc::clone(&self.input),
284            metrics: self.metrics.clone(),
285            default_selectivity: self.default_selectivity,
286            cache: Arc::clone(&self.cache),
287            projection: self.projection.clone(),
288            batch_size,
289            fetch: self.fetch,
290        })
291    }
292
293    /// The expression to filter on. This expression must evaluate to a boolean value.
294    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
295        &self.predicate
296    }
297
298    /// The input plan
299    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
300        &self.input
301    }
302
303    /// The default selectivity
304    pub fn default_selectivity(&self) -> u8 {
305        self.default_selectivity
306    }
307
308    /// Projection
309    pub fn projection(&self) -> &Option<ProjectionRef> {
310        &self.projection
311    }
312
313    /// Calculates `Statistics` for `FilterExec`, by applying selectivity
314    /// (either default, or estimated) to input statistics.
315    ///
316    /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or
317    /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 2`).
318    pub(crate) fn statistics_helper(
319        schema: &SchemaRef,
320        input_stats: Statistics,
321        predicate: &Arc<dyn PhysicalExpr>,
322        default_selectivity: u8,
323    ) -> Result<Statistics> {
324        let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
325
326        let input_num_rows = input_stats.num_rows;
327        let input_total_byte_size = input_stats.total_byte_size;
328
329        let (selectivity, num_rows, column_statistics) = if is_infeasible {
330            // Contradictory predicate: zero rows, and null/min/max are
331            // undefined on an empty column.
332            let mut cs = input_stats.to_inexact().column_statistics;
333            for col_stat in &mut cs {
334                col_stat.distinct_count = Precision::Exact(0);
335                col_stat.null_count = Precision::Exact(0);
336                col_stat.min_value = Precision::Absent;
337                col_stat.max_value = Precision::Absent;
338                col_stat.sum_value = Precision::Absent;
339                col_stat.byte_size = Precision::Exact(0);
340            }
341            (0.0, Precision::Exact(0), cs)
342        } else if !check_support(predicate, schema) {
343            // Interval analysis is not applicable; fall back to the default
344            // selectivity but still pin NDV=1 for every `col = literal` column.
345            let selectivity = default_selectivity as f64 / 100.0;
346            let mut cs = input_stats.to_inexact().column_statistics;
347            for &idx in &eq_columns {
348                if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) {
349                    cs[idx].distinct_count = Precision::Exact(1);
350                }
351            }
352            (
353                selectivity,
354                input_num_rows.with_estimated_selectivity(selectivity),
355                cs,
356            )
357        } else {
358            // Interval-analysis path. `collect_new_statistics` already sets
359            // distinct_count = Exact(1) when an interval collapses to a single
360            // value, so no post-fix is needed here.
361            let input_analysis_ctx = AnalysisContext::try_from_statistics(
362                schema,
363                &input_stats.column_statistics,
364            )?;
365            let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
366            let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
367            let filtered_num_rows =
368                input_num_rows.with_estimated_selectivity(selectivity);
369            let cs = collect_new_statistics(
370                schema,
371                &input_stats.column_statistics,
372                analysis_ctx.boundaries,
373                match &filtered_num_rows {
374                    Precision::Absent => None,
375                    p => Some(*p),
376                },
377            );
378            (selectivity, filtered_num_rows, cs)
379        };
380
381        let total_byte_size =
382            input_total_byte_size.with_estimated_selectivity(selectivity);
383
384        Ok(Statistics {
385            num_rows,
386            total_byte_size,
387            column_statistics,
388        })
389    }
390
391    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
392    fn compute_properties(
393        input: &Arc<dyn ExecutionPlan>,
394        predicate: &Arc<dyn PhysicalExpr>,
395        default_selectivity: u8,
396        projection: Option<&[usize]>,
397    ) -> Result<PlanProperties> {
398        // Combine the equal predicates with the input equivalence properties
399        // to construct the equivalence properties:
400        let schema = input.schema();
401        let stats = Self::statistics_helper(
402            &schema,
403            Arc::unwrap_or_clone(input.partition_statistics(None)?),
404            predicate,
405            default_selectivity,
406        )?;
407        let mut eq_properties = input.equivalence_properties().clone();
408        let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
409        for (lhs, rhs) in equal_pairs {
410            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
411        }
412        // Add the columns that have only one viable value (singleton) after
413        // filtering to constants.
414        let constants = collect_columns(predicate)
415            .into_iter()
416            .filter(|column| stats.column_statistics[column.index()].is_singleton())
417            .map(|column| {
418                let value = stats.column_statistics[column.index()]
419                    .min_value
420                    .get_value();
421                let expr = Arc::new(column) as _;
422                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
423            });
424        // This is for statistics
425        eq_properties.add_constants(constants)?;
426        // This is for logical constant (for example: a = '1', then a could be marked as a constant)
427        // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
428        eq_properties.add_constants(ConstExpr::collect_predicate_constants(
429            input.equivalence_properties(),
430            predicate,
431        ))?;
432
433        let mut output_partitioning = input.output_partitioning().clone();
434        // If contains projection, update the PlanProperties.
435        if let Some(projection) = projection {
436            let schema = eq_properties.schema();
437            let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
438            let out_schema = project_schema(schema, Some(&projection))?;
439            output_partitioning =
440                output_partitioning.project(&projection_mapping, &eq_properties);
441            eq_properties = eq_properties.project(&projection_mapping, out_schema);
442        }
443
444        Ok(PlanProperties::new(
445            eq_properties,
446            output_partitioning,
447            input.pipeline_behavior(),
448            input.boundedness(),
449        ))
450    }
451
452    fn with_new_children_and_same_properties(
453        &self,
454        mut children: Vec<Arc<dyn ExecutionPlan>>,
455    ) -> Self {
456        Self {
457            input: children.swap_remove(0),
458            metrics: ExecutionPlanMetricsSet::new(),
459            ..Self::clone(self)
460        }
461    }
462}
463
464impl DisplayAs for FilterExec {
465    fn fmt_as(
466        &self,
467        t: DisplayFormatType,
468        f: &mut std::fmt::Formatter,
469    ) -> std::fmt::Result {
470        match t {
471            DisplayFormatType::Default | DisplayFormatType::Verbose => {
472                let display_projections = if let Some(projection) =
473                    self.projection.as_ref()
474                {
475                    format!(
476                        ", projection=[{}]",
477                        projection
478                            .iter()
479                            .map(|index| format!(
480                                "{}@{}",
481                                self.input.schema().fields().get(*index).unwrap().name(),
482                                index
483                            ))
484                            .collect::<Vec<_>>()
485                            .join(", ")
486                    )
487                } else {
488                    "".to_string()
489                };
490                let fetch = self
491                    .fetch
492                    .map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
493                write!(
494                    f,
495                    "FilterExec: {}{}{}",
496                    self.predicate, display_projections, fetch
497                )
498            }
499            DisplayFormatType::TreeRender => {
500                if let Some(fetch) = self.fetch {
501                    writeln!(f, "fetch={fetch}")?;
502                }
503                write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
504            }
505        }
506    }
507}
508
509impl ExecutionPlan for FilterExec {
510    fn name(&self) -> &'static str {
511        "FilterExec"
512    }
513
514    /// Return a reference to Any that can be used for downcasting
515    fn properties(&self) -> &Arc<PlanProperties> {
516        &self.cache
517    }
518
519    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
520        vec![&self.input]
521    }
522
523    fn maintains_input_order(&self) -> Vec<bool> {
524        // Tell optimizer this operator doesn't reorder its input
525        vec![true]
526    }
527
528    fn with_new_children(
529        self: Arc<Self>,
530        mut children: Vec<Arc<dyn ExecutionPlan>>,
531    ) -> Result<Arc<dyn ExecutionPlan>> {
532        check_if_same_properties!(self, children);
533        let new_input = children.swap_remove(0);
534        FilterExecBuilder::from(&*self)
535            .with_input(new_input)
536            .build()
537            .map(|e| Arc::new(e) as _)
538    }
539
540    fn execute(
541        &self,
542        partition: usize,
543        context: Arc<TaskContext>,
544    ) -> Result<SendableRecordBatchStream> {
545        trace!(
546            "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}",
547            partition,
548            context.session_id(),
549            context.task_id()
550        );
551        let metrics = FilterExecMetrics::new(&self.metrics, partition);
552        Ok(Box::pin(FilterExecStream {
553            schema: self.schema(),
554            predicate: Arc::clone(&self.predicate),
555            input: self.input.execute(partition, context)?,
556            metrics,
557            projection: self.projection.clone(),
558            batch_coalescer: LimitedBatchCoalescer::new(
559                self.schema(),
560                self.batch_size,
561                self.fetch,
562            ),
563        }))
564    }
565
566    fn metrics(&self) -> Option<MetricsSet> {
567        Some(self.metrics.clone_inner())
568    }
569
570    /// The output statistics of a filtering operation can be estimated if the
571    /// predicate's selectivity value can be determined for the incoming data.
572    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
573        let input_stats =
574            Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
575        let stats = Self::statistics_helper(
576            &self.input.schema(),
577            input_stats,
578            self.predicate(),
579            self.default_selectivity,
580        )?;
581        Ok(Arc::new(stats.project(self.projection.as_ref())))
582    }
583
584    fn cardinality_effect(&self) -> CardinalityEffect {
585        CardinalityEffect::LowerEqual
586    }
587
588    /// Tries to swap `projection` with its input (`filter`). If possible, performs
589    /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`.
590    fn try_swapping_with_projection(
591        &self,
592        projection: &ProjectionExec,
593    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
594        // If the projection does not narrow the schema, we should not try to push it down:
595        if projection.expr().len() < projection.input().schema().fields().len() {
596            // Each column in the predicate expression must exist after the projection.
597            if let Some(new_predicate) =
598                update_expr(self.predicate(), projection.expr(), false)?
599            {
600                return FilterExecBuilder::from(self)
601                    .with_input(make_with_child(projection, self.input())?)
602                    .with_predicate(new_predicate)
603                    // The original FilterExec projection referenced columns from its old
604                    // input. After the swap the new input is the ProjectionExec which
605                    // already handles column selection, so clear the projection here.
606                    .apply_projection(None)?
607                    .build()
608                    .map(|e| Some(Arc::new(e) as _));
609            }
610        }
611        try_embed_projection(projection, self)
612    }
613
614    fn gather_filters_for_pushdown(
615        &self,
616        phase: FilterPushdownPhase,
617        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
618        _config: &ConfigOptions,
619    ) -> Result<FilterDescription> {
620        if phase != FilterPushdownPhase::Pre {
621            let child =
622                ChildFilterDescription::from_child(&parent_filters, self.input())?;
623            return Ok(FilterDescription::new().with_child(child));
624        }
625
626        let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
627            .with_self_filters(
628                split_conjunction(&self.predicate)
629                    .into_iter()
630                    .cloned()
631                    .collect(),
632            );
633
634        Ok(FilterDescription::new().with_child(child))
635    }
636
637    fn handle_child_pushdown_result(
638        &self,
639        phase: FilterPushdownPhase,
640        child_pushdown_result: ChildPushdownResult,
641        _config: &ConfigOptions,
642    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
643        if phase != FilterPushdownPhase::Pre {
644            return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
645        }
646        // We absorb any parent filters that were not handled by our children
647        let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
648            child_pushdown_result
649                .parent_filters
650                .iter()
651                .filter_map(|f| {
652                    matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
653                })
654                .collect();
655
656        // If this FilterExec has a projection, the unsupported parent filters
657        // are in the output schema (after projection) coordinates. We need to
658        // remap them to the input schema coordinates before combining with self filters.
659        if self.projection.is_some() {
660            let input_schema = self.input().schema();
661            unsupported_parent_filters = unsupported_parent_filters
662                .into_iter()
663                .map(|expr| reassign_expr_columns(expr, &input_schema))
664                .collect::<Result<Vec<_>>>()?;
665        }
666
667        let unsupported_self_filters = child_pushdown_result
668            .self_filters
669            .first()
670            .expect("we have exactly one child")
671            .iter()
672            .filter_map(|f| match f.discriminant {
673                PushedDown::Yes => None,
674                PushedDown::No => Some(&f.predicate),
675            })
676            .cloned();
677
678        let unhandled_filters = unsupported_parent_filters
679            .into_iter()
680            .chain(unsupported_self_filters)
681            .collect_vec();
682
683        // If we have unhandled filters, we need to create a new FilterExec
684        let filter_input = Arc::clone(self.input());
685        let new_predicate = conjunction(unhandled_filters);
686        let updated_node = if new_predicate.eq(&lit(true)) {
687            // FilterExec is no longer needed, but we may need to leave a projection in place.
688            // If this FilterExec had a fetch limit, propagate it to the child.
689            // When the child also has a fetch, use the minimum of both to preserve
690            // the tighter constraint.
691            let filter_input = if let Some(outer_fetch) = self.fetch {
692                let effective_fetch = match filter_input.fetch() {
693                    Some(inner_fetch) => outer_fetch.min(inner_fetch),
694                    None => outer_fetch,
695                };
696                match filter_input.with_fetch(Some(effective_fetch)) {
697                    Some(node) => node,
698                    None => Arc::new(LocalLimitExec::new(filter_input, effective_fetch)),
699                }
700            } else {
701                filter_input
702            };
703            match self.projection().as_ref() {
704                Some(projection_indices) => {
705                    let filter_child_schema = filter_input.schema();
706                    let proj_exprs = projection_indices
707                        .iter()
708                        .map(|p| {
709                            let field = filter_child_schema.field(*p).clone();
710                            ProjectionExpr {
711                                expr: Arc::new(Column::new(field.name(), *p))
712                                    as Arc<dyn PhysicalExpr>,
713                                alias: field.name().to_string(),
714                            }
715                        })
716                        .collect::<Vec<_>>();
717                    Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
718                        as Arc<dyn ExecutionPlan>)
719                }
720                None => {
721                    // No projection needed, just return the input
722                    Some(filter_input)
723                }
724            }
725        } else if new_predicate.eq(&self.predicate) {
726            // The new predicate is the same as our current predicate
727            None
728        } else {
729            // Create a new FilterExec with the new predicate, preserving the projection
730            let new = FilterExec {
731                predicate: Arc::clone(&new_predicate),
732                input: Arc::clone(&filter_input),
733                metrics: self.metrics.clone(),
734                default_selectivity: self.default_selectivity,
735                cache: Arc::new(Self::compute_properties(
736                    &filter_input,
737                    &new_predicate,
738                    self.default_selectivity,
739                    self.projection.as_deref(),
740                )?),
741                projection: self.projection.clone(),
742                batch_size: self.batch_size,
743                fetch: self.fetch,
744            };
745            Some(Arc::new(new) as _)
746        };
747
748        Ok(FilterPushdownPropagation {
749            filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
750            updated_node,
751        })
752    }
753
754    fn fetch(&self) -> Option<usize> {
755        self.fetch
756    }
757
758    fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
759        Some(Arc::new(Self {
760            predicate: Arc::clone(&self.predicate),
761            input: Arc::clone(&self.input),
762            metrics: self.metrics.clone(),
763            default_selectivity: self.default_selectivity,
764            cache: Arc::clone(&self.cache),
765            projection: self.projection.clone(),
766            batch_size: self.batch_size,
767            fetch,
768        }))
769    }
770
771    fn with_preserve_order(
772        &self,
773        preserve_order: bool,
774    ) -> Option<Arc<dyn ExecutionPlan>> {
775        self.input
776            .with_preserve_order(preserve_order)
777            .and_then(|new_input| {
778                Arc::new(self.clone())
779                    .with_new_children(vec![new_input])
780                    .ok()
781            })
782    }
783}
784
785impl EmbeddedProjection for FilterExec {
786    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
787        FilterExecBuilder::from(self)
788            .apply_projection(projection)?
789            .build()
790    }
791}
792
793/// Collects column equality information from `col = literal` predicates in a
794/// conjunction.
795///
796/// Returns `(eq_columns, is_infeasible)`:
797/// - `eq_columns`: set of column indices constrained to a single literal value.
798/// - `is_infeasible`: `true` when the same column is equated to two different
799///   non-null literals (e.g. `name = 'alice' AND name = 'bob'`), which is
800///   always unsatisfiable.
801///
802/// Only AND conjunctions are traversed; OR is intentionally skipped
803/// since `a = 1 OR a = 2` does not pin NDV to 1.
804fn collect_equality_columns(predicate: &Arc<dyn PhysicalExpr>) -> (HashSet<usize>, bool) {
805    let mut eq_values: HashMap<usize, ScalarValue> = HashMap::new();
806    let mut infeasible = false;
807
808    for expr in split_conjunction(predicate) {
809        let Some(binary) = expr.downcast_ref::<BinaryExpr>() else {
810            continue;
811        };
812        if *binary.op() != Operator::Eq {
813            continue;
814        }
815        let left = binary.left();
816        let right = binary.right();
817        let pair = if let Some(col) = left.downcast_ref::<Column>()
818            && let Some(lit) = right.downcast_ref::<Literal>()
819            && !lit.value().is_null()
820        {
821            Some((col.index(), lit.value().clone()))
822        } else if let Some(col) = right.downcast_ref::<Column>()
823            && let Some(lit) = left.downcast_ref::<Literal>()
824            && !lit.value().is_null()
825        {
826            Some((col.index(), lit.value().clone()))
827        } else {
828            None
829        };
830
831        if let Some((idx, value)) = pair {
832            match eq_values.entry(idx) {
833                Entry::Occupied(prev) => {
834                    if *prev.get() != value {
835                        infeasible = true;
836                        break;
837                    }
838                }
839                Entry::Vacant(slot) => {
840                    slot.insert(value);
841                }
842            }
843        }
844    }
845
846    (eq_values.into_keys().collect(), infeasible)
847}
848
849/// Converts an interval bound to a [`Precision`] value. NULL bounds (which
850/// represent "unbounded" in the interval type) map to [`Precision::Absent`].
851fn interval_bound_to_precision(
852    bound: ScalarValue,
853    is_exact: bool,
854) -> Precision<ScalarValue> {
855    if bound.is_null() {
856        Precision::Absent
857    } else if is_exact {
858        Precision::Exact(bound)
859    } else {
860        Precision::Inexact(bound)
861    }
862}
863
864/// This function ensures that all bounds in the `ExprBoundaries` vector are
865/// converted to closed bounds. If a lower/upper bound is initially open, it
866/// is adjusted by using the next/previous value for its data type to convert
867/// it into a closed bound.
868fn collect_new_statistics(
869    schema: &SchemaRef,
870    input_column_stats: &[ColumnStatistics],
871    analysis_boundaries: Vec<ExprBoundaries>,
872    filtered_num_rows: Option<Precision<usize>>,
873) -> Vec<ColumnStatistics> {
874    analysis_boundaries
875        .into_iter()
876        .enumerate()
877        .map(
878            |(
879                idx,
880                ExprBoundaries {
881                    interval,
882                    distinct_count,
883                    ..
884                },
885            )| {
886                let Some(interval) = interval else {
887                    // If the interval is `None`, we can say that there are no rows.
888                    // Use a typed null to preserve the column's data type, so that
889                    // downstream interval analysis can still intersect intervals
890                    // of the same type.
891                    let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
892                        .unwrap_or(ScalarValue::Null);
893                    return ColumnStatistics {
894                        null_count: Precision::Exact(0),
895                        max_value: Precision::Exact(typed_null.clone()),
896                        min_value: Precision::Exact(typed_null.clone()),
897                        sum_value: Precision::Exact(typed_null),
898                        distinct_count: Precision::Exact(0),
899                        byte_size: Precision::Exact(0),
900                    };
901                };
902                let (lower, upper) = interval.into_bounds();
903                let is_single_value =
904                    !lower.is_null() && !upper.is_null() && lower == upper;
905                let min_value = interval_bound_to_precision(lower, is_single_value);
906                let max_value = interval_bound_to_precision(upper, is_single_value);
907                // When the interval collapses to a single value (equality
908                // predicate), the column has exactly 1 distinct value.
909                // Otherwise, cap NDV at the filtered row count.
910                let capped_distinct_count = if is_single_value {
911                    Precision::Exact(1)
912                } else {
913                    match filtered_num_rows {
914                        Some(rows) => distinct_count.to_inexact().min(&rows),
915                        None => distinct_count.to_inexact(),
916                    }
917                };
918                ColumnStatistics {
919                    null_count: input_column_stats[idx].null_count.to_inexact(),
920                    max_value,
921                    min_value,
922                    sum_value: Precision::Absent,
923                    distinct_count: capped_distinct_count,
924                    byte_size: input_column_stats[idx].byte_size,
925                }
926            },
927        )
928        .collect()
929}
930
931/// The FilterExec streams wraps the input iterator and applies the predicate expression to
932/// determine which rows to include in its output batches
933struct FilterExecStream {
934    /// Output schema after the projection
935    schema: SchemaRef,
936    /// The expression to filter on. This expression must evaluate to a boolean value.
937    predicate: Arc<dyn PhysicalExpr>,
938    /// The input partition to filter.
939    input: SendableRecordBatchStream,
940    /// Runtime metrics recording
941    metrics: FilterExecMetrics,
942    /// The projection indices of the columns in the input schema
943    projection: Option<ProjectionRef>,
944    /// Batch coalescer to combine small batches
945    batch_coalescer: LimitedBatchCoalescer,
946}
947
948/// The metrics for `FilterExec`
949struct FilterExecMetrics {
950    /// Common metrics for most operators
951    baseline_metrics: BaselineMetrics,
952    /// Selectivity of the filter, calculated as output_rows / input_rows
953    selectivity: RatioMetrics,
954    // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics,
955    // or modifying metrics comments
956}
957
958impl FilterExecMetrics {
959    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
960        Self {
961            baseline_metrics: BaselineMetrics::new(metrics, partition),
962            selectivity: MetricBuilder::new(metrics)
963                .with_type(MetricType::Summary)
964                .ratio_metrics("selectivity", partition),
965        }
966    }
967}
968
969pub fn batch_filter(
970    batch: &RecordBatch,
971    predicate: &Arc<dyn PhysicalExpr>,
972) -> Result<RecordBatch> {
973    filter_and_project(batch, predicate, None)
974}
975
976fn filter_and_project(
977    batch: &RecordBatch,
978    predicate: &Arc<dyn PhysicalExpr>,
979    projection: Option<&Vec<usize>>,
980) -> Result<RecordBatch> {
981    predicate
982        .evaluate(batch)
983        .and_then(|v| v.into_array(batch.num_rows()))
984        .and_then(|array| {
985            Ok(match (as_boolean_array(&array), projection) {
986                // Apply filter array to record batch
987                (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
988                (Ok(filter_array), Some(projection)) => {
989                    let projected_batch = batch.project(projection)?;
990                    filter_record_batch(&projected_batch, filter_array)?
991                }
992                (Err(_), _) => {
993                    return internal_err!(
994                        "Cannot create filter_array from non-boolean predicates"
995                    );
996                }
997            })
998        })
999}
1000
1001impl Stream for FilterExecStream {
1002    type Item = Result<RecordBatch>;
1003
1004    fn poll_next(
1005        mut self: Pin<&mut Self>,
1006        cx: &mut Context<'_>,
1007    ) -> Poll<Option<Self::Item>> {
1008        let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
1009        loop {
1010            // If there is a completed batch ready, return it
1011            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
1012                self.metrics.selectivity.add_part(batch.num_rows());
1013                let poll = Poll::Ready(Some(Ok(batch)));
1014                return self.metrics.baseline_metrics.record_poll(poll);
1015            }
1016
1017            if self.batch_coalescer.is_finished() {
1018                // If input is done and no batches are ready, return None to signal end of stream.
1019                return Poll::Ready(None);
1020            }
1021
1022            // Attempt to pull the next batch from the input stream.
1023            match ready!(self.input.poll_next_unpin(cx)) {
1024                None => {
1025                    self.batch_coalescer.finish()?;
1026                    // Release the input pipeline's resources.
1027                    let input_schema = self.input.schema();
1028                    self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
1029                    // continue draining the coalescer
1030                }
1031                Some(Ok(batch)) => {
1032                    let timer = elapsed_compute.timer();
1033                    let status = self.predicate.as_ref()
1034                        .evaluate(&batch)
1035                        .and_then(|v| v.into_array(batch.num_rows()))
1036                        .and_then(|array| {
1037                            Ok(match self.projection.as_ref()  {
1038                                Some(projection) => {
1039                                    let projected_batch = batch.project(projection)?;
1040                                    (array, projected_batch)
1041                                },
1042                                None => (array, batch)
1043                            })
1044                        }).and_then(|(array, batch)| {
1045                            match as_boolean_array(&array) {
1046                                Ok(filter_array) => {
1047                                    self.metrics.selectivity.add_total(batch.num_rows());
1048                                    // TODO: support push_batch_with_filter in LimitedBatchCoalescer
1049                                    let batch = filter_record_batch(&batch, filter_array)?;
1050                                    let state = self.batch_coalescer.push_batch(batch)?;
1051                                    Ok(state)
1052                                }
1053                                Err(_) => {
1054                                    internal_err!(
1055                                        "Cannot create filter_array from non-boolean predicates"
1056                                    )
1057                                }
1058                            }
1059                        })?;
1060                    timer.done();
1061
1062                    match status {
1063                        PushBatchStatus::Continue => {
1064                            // Keep pushing more batches
1065                        }
1066                        PushBatchStatus::LimitReached => {
1067                            // limit was reached, so stop early
1068                            self.batch_coalescer.finish()?;
1069                            // Release the input pipeline's resources.
1070                            let input_schema = self.input.schema();
1071                            self.input =
1072                                Box::pin(EmptyRecordBatchStream::new(input_schema));
1073                            // continue draining the coalescer
1074                        }
1075                    }
1076                }
1077
1078                // Error case
1079                other => return Poll::Ready(other),
1080            }
1081        }
1082    }
1083
1084    fn size_hint(&self) -> (usize, Option<usize>) {
1085        // Same number of record batches
1086        self.input.size_hint()
1087    }
1088}
1089impl RecordBatchStream for FilterExecStream {
1090    fn schema(&self) -> SchemaRef {
1091        Arc::clone(&self.schema)
1092    }
1093}
1094
1095/// Return the equals Column-Pairs and Non-equals Column-Pairs
1096#[deprecated(
1097    since = "51.0.0",
1098    note = "This function will be internal in the future"
1099)]
1100pub fn collect_columns_from_predicate(
1101    predicate: &'_ Arc<dyn PhysicalExpr>,
1102) -> EqualAndNonEqual<'_> {
1103    collect_columns_from_predicate_inner(predicate)
1104}
1105
1106fn collect_columns_from_predicate_inner(
1107    predicate: &'_ Arc<dyn PhysicalExpr>,
1108) -> EqualAndNonEqual<'_> {
1109    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1110    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
1111
1112    let predicates = split_conjunction(predicate);
1113    predicates.into_iter().for_each(|p| {
1114        if let Some(binary) = p.downcast_ref::<BinaryExpr>() {
1115            // Only extract pairs where at least one side is a Column reference.
1116            // Pairs like `complex_expr = literal` should not create equivalence
1117            // classes — the literal could appear in many unrelated expressions
1118            // (e.g. sort keys), and normalize_expr's deep traversal would
1119            // replace those occurrences with the complex expression, corrupting
1120            // sort orderings. Constant propagation for such pairs is handled
1121            // separately by `extend_constants`.
1122            let has_direct_column_operand =
1123                binary.left().downcast_ref::<Column>().is_some()
1124                    || binary.right().downcast_ref::<Column>().is_some();
1125            if !has_direct_column_operand {
1126                return;
1127            }
1128            match binary.op() {
1129                Operator::Eq => {
1130                    eq_predicate_columns.push((binary.left(), binary.right()))
1131                }
1132                Operator::NotEq => {
1133                    ne_predicate_columns.push((binary.left(), binary.right()))
1134                }
1135                _ => {}
1136            }
1137        }
1138    });
1139
1140    (eq_predicate_columns, ne_predicate_columns)
1141}
1142
1143/// Pair of `Arc<dyn PhysicalExpr>`s
1144pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
1145
1146/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
1147pub type EqualAndNonEqual<'a> =
1148    (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::*;
1153    use crate::empty::EmptyExec;
1154    use crate::expressions::*;
1155    use crate::test;
1156    use crate::test::exec::StatisticsExec;
1157    use arrow::datatypes::{Field, Schema, UnionFields, UnionMode};
1158
1159    #[tokio::test]
1160    async fn collect_columns_predicates() -> Result<()> {
1161        let schema = test::aggr_test_schema();
1162        let predicate: Arc<dyn PhysicalExpr> = binary(
1163            binary(
1164                binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
1165                Operator::And,
1166                binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
1167                &schema,
1168            )?,
1169            Operator::And,
1170            binary(
1171                binary(
1172                    col("c2", &schema)?,
1173                    Operator::Eq,
1174                    col("c9", &schema)?,
1175                    &schema,
1176                )?,
1177                Operator::And,
1178                binary(
1179                    col("c1", &schema)?,
1180                    Operator::NotEq,
1181                    col("c13", &schema)?,
1182                    &schema,
1183                )?,
1184                &schema,
1185            )?,
1186            &schema,
1187        )?;
1188
1189        let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
1190        assert_eq!(2, equal_pairs.len());
1191        assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
1192        assert!(equal_pairs[0].1.eq(&lit(4u32)));
1193
1194        assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
1195        assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
1196
1197        assert_eq!(1, ne_pairs.len());
1198        assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
1199        assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
1200
1201        Ok(())
1202    }
1203
1204    #[tokio::test]
1205    async fn test_filter_statistics_basic_expr() -> Result<()> {
1206        // Table:
1207        //      a: min=1, max=100
1208        let bytes_per_row = 4;
1209        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1210        let input = Arc::new(StatisticsExec::new(
1211            Statistics {
1212                num_rows: Precision::Inexact(100),
1213                total_byte_size: Precision::Inexact(100 * bytes_per_row),
1214                column_statistics: vec![ColumnStatistics {
1215                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1216                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1217                    ..Default::default()
1218                }],
1219            },
1220            schema.clone(),
1221        ));
1222
1223        // a <= 25
1224        let predicate: Arc<dyn PhysicalExpr> =
1225            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1226
1227        // WHERE a <= 25
1228        let filter: Arc<dyn ExecutionPlan> =
1229            Arc::new(FilterExec::try_new(predicate, input)?);
1230
1231        let statistics = filter.partition_statistics(None)?;
1232        assert_eq!(statistics.num_rows, Precision::Inexact(25));
1233        assert_eq!(
1234            statistics.total_byte_size,
1235            Precision::Inexact(25 * bytes_per_row)
1236        );
1237        assert_eq!(
1238            statistics.column_statistics,
1239            vec![ColumnStatistics {
1240                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1241                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1242                ..Default::default()
1243            }]
1244        );
1245
1246        Ok(())
1247    }
1248
1249    #[tokio::test]
1250    async fn test_filter_statistics_column_level_nested() -> Result<()> {
1251        // Table:
1252        //      a: min=1, max=100
1253        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1254        let input = Arc::new(StatisticsExec::new(
1255            Statistics {
1256                num_rows: Precision::Inexact(100),
1257                column_statistics: vec![ColumnStatistics {
1258                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1259                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1260                    ..Default::default()
1261                }],
1262                total_byte_size: Precision::Absent,
1263            },
1264            schema.clone(),
1265        ));
1266
1267        // WHERE a <= 25
1268        let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1269            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1270            input,
1271        )?);
1272
1273        // Nested filters (two separate physical plans, instead of AND chain in the expr)
1274        // WHERE a >= 10
1275        // WHERE a <= 25
1276        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1277            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1278            sub_filter,
1279        )?);
1280
1281        let statistics = filter.partition_statistics(None)?;
1282        assert_eq!(statistics.num_rows, Precision::Inexact(16));
1283        assert_eq!(
1284            statistics.column_statistics,
1285            vec![ColumnStatistics {
1286                min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1287                max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1288                ..Default::default()
1289            }]
1290        );
1291
1292        Ok(())
1293    }
1294
1295    #[tokio::test]
1296    async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
1297        // Table:
1298        //      a: min=1, max=100
1299        //      b: min=1, max=50
1300        let schema = Schema::new(vec![
1301            Field::new("a", DataType::Int32, false),
1302            Field::new("b", DataType::Int32, false),
1303        ]);
1304        let input = Arc::new(StatisticsExec::new(
1305            Statistics {
1306                num_rows: Precision::Inexact(100),
1307                column_statistics: vec![
1308                    ColumnStatistics {
1309                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1310                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1311                        ..Default::default()
1312                    },
1313                    ColumnStatistics {
1314                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1315                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1316                        ..Default::default()
1317                    },
1318                ],
1319                total_byte_size: Precision::Absent,
1320            },
1321            schema.clone(),
1322        ));
1323
1324        // WHERE a <= 25
1325        let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1326            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
1327            input,
1328        )?);
1329
1330        // WHERE b > 45
1331        let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1332            binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
1333            a_lte_25,
1334        )?);
1335
1336        // WHERE a >= 10
1337        let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
1338            binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
1339            b_gt_5,
1340        )?);
1341        let statistics = filter.partition_statistics(None)?;
1342        // On a uniform distribution, only fifteen rows will satisfy the
1343        // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
1344        // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
1345        //
1346        // Which would result with a selectivity of  '15/100 * 5/50' or 0.015
1347        // and that means about %1.5 of the all rows (rounded up to 2 rows).
1348        assert_eq!(statistics.num_rows, Precision::Inexact(2));
1349        assert_eq!(
1350            statistics.column_statistics,
1351            vec![
1352                ColumnStatistics {
1353                    min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1354                    max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
1355                    ..Default::default()
1356                },
1357                ColumnStatistics {
1358                    min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
1359                    max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1360                    ..Default::default()
1361                }
1362            ]
1363        );
1364
1365        Ok(())
1366    }
1367
1368    #[tokio::test]
1369    async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
1370        // Table:
1371        //      a: min=???, max=??? (missing)
1372        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1373        let input = Arc::new(StatisticsExec::new(
1374            Statistics::new_unknown(&schema),
1375            schema.clone(),
1376        ));
1377
1378        // a <= 25
1379        let predicate: Arc<dyn PhysicalExpr> =
1380            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
1381
1382        // WHERE a <= 25
1383        let filter: Arc<dyn ExecutionPlan> =
1384            Arc::new(FilterExec::try_new(predicate, input)?);
1385
1386        let statistics = filter.partition_statistics(None)?;
1387        assert_eq!(statistics.num_rows, Precision::Absent);
1388
1389        Ok(())
1390    }
1391
1392    #[tokio::test]
1393    async fn test_filter_statistics_multiple_columns() -> Result<()> {
1394        // Table:
1395        //      a: min=1, max=100
1396        //      b: min=1, max=3
1397        //      c: min=1000.0  max=1100.0
1398        let schema = Schema::new(vec![
1399            Field::new("a", DataType::Int32, false),
1400            Field::new("b", DataType::Int32, false),
1401            Field::new("c", DataType::Float32, false),
1402        ]);
1403        let input = Arc::new(StatisticsExec::new(
1404            Statistics {
1405                num_rows: Precision::Inexact(1000),
1406                total_byte_size: Precision::Inexact(4000),
1407                column_statistics: vec![
1408                    ColumnStatistics {
1409                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1410                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1411                        ..Default::default()
1412                    },
1413                    ColumnStatistics {
1414                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1415                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1416                        ..Default::default()
1417                    },
1418                    ColumnStatistics {
1419                        min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1420                        max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
1421                        ..Default::default()
1422                    },
1423                ],
1424            },
1425            schema,
1426        ));
1427        // WHERE a<=53 AND (b=3 AND (c<=1075.0 AND a>b))
1428        let predicate = Arc::new(BinaryExpr::new(
1429            Arc::new(BinaryExpr::new(
1430                Arc::new(Column::new("a", 0)),
1431                Operator::LtEq,
1432                Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
1433            )),
1434            Operator::And,
1435            Arc::new(BinaryExpr::new(
1436                Arc::new(BinaryExpr::new(
1437                    Arc::new(Column::new("b", 1)),
1438                    Operator::Eq,
1439                    Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
1440                )),
1441                Operator::And,
1442                Arc::new(BinaryExpr::new(
1443                    Arc::new(BinaryExpr::new(
1444                        Arc::new(Column::new("c", 2)),
1445                        Operator::LtEq,
1446                        Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
1447                    )),
1448                    Operator::And,
1449                    Arc::new(BinaryExpr::new(
1450                        Arc::new(Column::new("a", 0)),
1451                        Operator::Gt,
1452                        Arc::new(Column::new("b", 1)),
1453                    )),
1454                )),
1455            )),
1456        ));
1457        let filter: Arc<dyn ExecutionPlan> =
1458            Arc::new(FilterExec::try_new(predicate, input)?);
1459        let statistics = filter.partition_statistics(None)?;
1460        // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330...
1461        // num_rows after ceil => 133.0... => 134
1462        // total_byte_size after ceil => 532.0... => 533
1463        assert_eq!(statistics.num_rows, Precision::Inexact(134));
1464        assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
1465        let exp_col_stats = vec![
1466            ColumnStatistics {
1467                min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
1468                max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
1469                ..Default::default()
1470            },
1471            ColumnStatistics {
1472                min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1473                max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1474                ..Default::default()
1475            },
1476            ColumnStatistics {
1477                min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
1478                max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
1479                ..Default::default()
1480            },
1481        ];
1482        let _ = exp_col_stats
1483            .into_iter()
1484            .zip(statistics.column_statistics.clone())
1485            .map(|(expected, actual)| {
1486                if let Some(val) = actual.min_value.get_value() {
1487                    if val.data_type().is_floating() {
1488                        // Windows rounds arithmetic operation results differently for floating point numbers.
1489                        // Therefore, we check if the actual values are in an epsilon range.
1490                        let actual_min = actual.min_value.get_value().unwrap();
1491                        let actual_max = actual.max_value.get_value().unwrap();
1492                        let expected_min = expected.min_value.get_value().unwrap();
1493                        let expected_max = expected.max_value.get_value().unwrap();
1494                        let eps = ScalarValue::Float32(Some(1e-6));
1495
1496                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1497                        assert!(actual_min.sub(expected_min).unwrap() < eps);
1498
1499                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1500                        assert!(actual_max.sub(expected_max).unwrap() < eps);
1501                    } else {
1502                        assert_eq!(actual, expected);
1503                    }
1504                } else {
1505                    assert_eq!(actual, expected);
1506                }
1507            });
1508
1509        Ok(())
1510    }
1511
1512    #[tokio::test]
1513    async fn test_filter_statistics_full_selective() -> Result<()> {
1514        // Table:
1515        //      a: min=1, max=100
1516        //      b: min=1, max=3
1517        let schema = Schema::new(vec![
1518            Field::new("a", DataType::Int32, false),
1519            Field::new("b", DataType::Int32, false),
1520        ]);
1521        let input = Arc::new(StatisticsExec::new(
1522            Statistics {
1523                num_rows: Precision::Inexact(1000),
1524                total_byte_size: Precision::Inexact(4000),
1525                column_statistics: vec![
1526                    ColumnStatistics {
1527                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1528                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1529                        ..Default::default()
1530                    },
1531                    ColumnStatistics {
1532                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1533                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1534                        ..Default::default()
1535                    },
1536                ],
1537            },
1538            schema,
1539        ));
1540        // WHERE a<200 AND 1<=b
1541        let predicate = Arc::new(BinaryExpr::new(
1542            Arc::new(BinaryExpr::new(
1543                Arc::new(Column::new("a", 0)),
1544                Operator::Lt,
1545                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1546            )),
1547            Operator::And,
1548            Arc::new(BinaryExpr::new(
1549                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1550                Operator::LtEq,
1551                Arc::new(Column::new("b", 1)),
1552            )),
1553        ));
1554        // Since filter predicate passes all entries, statistics after filter shouldn't change.
1555        let expected = input.partition_statistics(None)?.column_statistics.clone();
1556        let filter: Arc<dyn ExecutionPlan> =
1557            Arc::new(FilterExec::try_new(predicate, input)?);
1558        let statistics = filter.partition_statistics(None)?;
1559
1560        assert_eq!(statistics.num_rows, Precision::Inexact(1000));
1561        assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
1562        assert_eq!(statistics.column_statistics, expected);
1563
1564        Ok(())
1565    }
1566
1567    #[tokio::test]
1568    async fn test_filter_statistics_zero_selective() -> Result<()> {
1569        // Table:
1570        //      a: min=1, max=100
1571        //      b: min=1, max=3
1572        let schema = Schema::new(vec![
1573            Field::new("a", DataType::Int32, false),
1574            Field::new("b", DataType::Int32, false),
1575        ]);
1576        let input = Arc::new(StatisticsExec::new(
1577            Statistics {
1578                num_rows: Precision::Inexact(1000),
1579                total_byte_size: Precision::Inexact(4000),
1580                column_statistics: vec![
1581                    ColumnStatistics {
1582                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1583                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1584                        ..Default::default()
1585                    },
1586                    ColumnStatistics {
1587                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1588                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1589                        ..Default::default()
1590                    },
1591                ],
1592            },
1593            schema,
1594        ));
1595        // WHERE a>200 AND 1<=b
1596        let predicate = Arc::new(BinaryExpr::new(
1597            Arc::new(BinaryExpr::new(
1598                Arc::new(Column::new("a", 0)),
1599                Operator::Gt,
1600                Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1601            )),
1602            Operator::And,
1603            Arc::new(BinaryExpr::new(
1604                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1605                Operator::LtEq,
1606                Arc::new(Column::new("b", 1)),
1607            )),
1608        ));
1609        let filter: Arc<dyn ExecutionPlan> =
1610            Arc::new(FilterExec::try_new(predicate, input)?);
1611        let statistics = filter.partition_statistics(None)?;
1612
1613        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1614        assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
1615        assert_eq!(
1616            statistics.column_statistics,
1617            vec![
1618                ColumnStatistics {
1619                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1620                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1621                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1622                    distinct_count: Precision::Exact(0),
1623                    null_count: Precision::Exact(0),
1624                    byte_size: Precision::Exact(0),
1625                },
1626                ColumnStatistics {
1627                    min_value: Precision::Exact(ScalarValue::Int32(None)),
1628                    max_value: Precision::Exact(ScalarValue::Int32(None)),
1629                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
1630                    distinct_count: Precision::Exact(0),
1631                    null_count: Precision::Exact(0),
1632                    byte_size: Precision::Exact(0),
1633                },
1634            ]
1635        );
1636
1637        Ok(())
1638    }
1639
1640    /// Regression test: stacking two FilterExecs where the inner filter
1641    /// proves zero selectivity should not panic with a type mismatch
1642    /// during interval intersection.
1643    ///
1644    /// Previously, when a filter proved no rows could match, the column
1645    /// statistics used untyped `ScalarValue::Null` (data type `Null`).
1646    /// If an outer FilterExec then tried to analyze its own predicate
1647    /// against those statistics, `Interval::intersect` would fail with:
1648    ///   "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1649    #[tokio::test]
1650    async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1651        // Inner table: a: [1, 100], b: [1, 3]
1652        let schema = Schema::new(vec![
1653            Field::new("a", DataType::Int32, false),
1654            Field::new("b", DataType::Int32, false),
1655        ]);
1656        let input = Arc::new(StatisticsExec::new(
1657            Statistics {
1658                num_rows: Precision::Inexact(1000),
1659                total_byte_size: Precision::Inexact(4000),
1660                column_statistics: vec![
1661                    ColumnStatistics {
1662                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1663                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1664                        ..Default::default()
1665                    },
1666                    ColumnStatistics {
1667                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1668                        max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1669                        ..Default::default()
1670                    },
1671                ],
1672            },
1673            schema,
1674        ));
1675
1676        // Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1677        let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1678            Arc::new(Column::new("a", 0)),
1679            Operator::Gt,
1680            Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1681        ));
1682        let inner_filter: Arc<dyn ExecutionPlan> =
1683            Arc::new(FilterExec::try_new(inner_predicate, input)?);
1684
1685        // Outer filter: a = 50
1686        // Before the fix, this would panic because the inner filter's
1687        // zero-selectivity statistics produced Null-typed intervals for
1688        // column `a`, which couldn't intersect with the Int32 literal.
1689        let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1690            Arc::new(Column::new("a", 0)),
1691            Operator::Eq,
1692            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1693        ));
1694        let outer_filter: Arc<dyn ExecutionPlan> =
1695            Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1696
1697        // Should succeed without error
1698        let statistics = outer_filter.partition_statistics(None)?;
1699        assert_eq!(statistics.num_rows, Precision::Inexact(0));
1700
1701        Ok(())
1702    }
1703
1704    #[tokio::test]
1705    async fn test_filter_statistics_more_inputs() -> Result<()> {
1706        let schema = Schema::new(vec![
1707            Field::new("a", DataType::Int32, false),
1708            Field::new("b", DataType::Int32, false),
1709        ]);
1710        let input = Arc::new(StatisticsExec::new(
1711            Statistics {
1712                num_rows: Precision::Inexact(1000),
1713                total_byte_size: Precision::Inexact(4000),
1714                column_statistics: vec![
1715                    ColumnStatistics {
1716                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1717                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1718                        ..Default::default()
1719                    },
1720                    ColumnStatistics {
1721                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1722                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1723                        ..Default::default()
1724                    },
1725                ],
1726            },
1727            schema,
1728        ));
1729        // WHERE a<50
1730        let predicate = Arc::new(BinaryExpr::new(
1731            Arc::new(Column::new("a", 0)),
1732            Operator::Lt,
1733            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1734        ));
1735        let filter: Arc<dyn ExecutionPlan> =
1736            Arc::new(FilterExec::try_new(predicate, input)?);
1737        let statistics = filter.partition_statistics(None)?;
1738
1739        assert_eq!(statistics.num_rows, Precision::Inexact(490));
1740        assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
1741        assert_eq!(
1742            statistics.column_statistics,
1743            vec![
1744                ColumnStatistics {
1745                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1746                    max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
1747                    ..Default::default()
1748                },
1749                ColumnStatistics {
1750                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1751                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1752                    ..Default::default()
1753                },
1754            ]
1755        );
1756
1757        Ok(())
1758    }
1759
1760    #[tokio::test]
1761    async fn test_empty_input_statistics() -> Result<()> {
1762        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1763        let input = Arc::new(StatisticsExec::new(
1764            Statistics::new_unknown(&schema),
1765            schema,
1766        ));
1767        // WHERE a <= 10 AND 0 <= a - 5
1768        let predicate = Arc::new(BinaryExpr::new(
1769            Arc::new(BinaryExpr::new(
1770                Arc::new(Column::new("a", 0)),
1771                Operator::LtEq,
1772                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1773            )),
1774            Operator::And,
1775            Arc::new(BinaryExpr::new(
1776                Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1777                Operator::LtEq,
1778                Arc::new(BinaryExpr::new(
1779                    Arc::new(Column::new("a", 0)),
1780                    Operator::Minus,
1781                    Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1782                )),
1783            )),
1784        ));
1785        let filter: Arc<dyn ExecutionPlan> =
1786            Arc::new(FilterExec::try_new(predicate, input)?);
1787        let filter_statistics = filter.partition_statistics(None)?;
1788
1789        let expected_filter_statistics = Statistics {
1790            num_rows: Precision::Absent,
1791            total_byte_size: Precision::Absent,
1792            column_statistics: vec![ColumnStatistics {
1793                null_count: Precision::Absent,
1794                min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1795                max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1796                sum_value: Precision::Absent,
1797                distinct_count: Precision::Absent,
1798                byte_size: Precision::Absent,
1799            }],
1800        };
1801
1802        assert_eq!(*filter_statistics, expected_filter_statistics);
1803
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    async fn test_statistics_with_constant_column() -> Result<()> {
1809        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1810        let input = Arc::new(StatisticsExec::new(
1811            Statistics::new_unknown(&schema),
1812            schema,
1813        ));
1814        // WHERE a = 10
1815        let predicate = Arc::new(BinaryExpr::new(
1816            Arc::new(Column::new("a", 0)),
1817            Operator::Eq,
1818            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1819        ));
1820        let filter: Arc<dyn ExecutionPlan> =
1821            Arc::new(FilterExec::try_new(predicate, input)?);
1822        let filter_statistics = filter.partition_statistics(None)?;
1823        // First column is "a", and it is a column with only one value after the filter.
1824        assert!(filter_statistics.column_statistics[0].is_singleton());
1825
1826        Ok(())
1827    }
1828
1829    #[tokio::test]
1830    async fn test_validation_filter_selectivity() -> Result<()> {
1831        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1832        let input = Arc::new(StatisticsExec::new(
1833            Statistics::new_unknown(&schema),
1834            schema,
1835        ));
1836        // WHERE a = 10
1837        let predicate = Arc::new(BinaryExpr::new(
1838            Arc::new(Column::new("a", 0)),
1839            Operator::Eq,
1840            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1841        ));
1842        let filter = FilterExec::try_new(predicate, input)?;
1843        assert!(filter.with_default_selectivity(120).is_err());
1844        Ok(())
1845    }
1846
1847    #[tokio::test]
1848    async fn test_custom_filter_selectivity() -> Result<()> {
1849        // Need a decimal to trigger inexact selectivity
1850        let schema =
1851            Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1852        let input = Arc::new(StatisticsExec::new(
1853            Statistics {
1854                num_rows: Precision::Inexact(1000),
1855                total_byte_size: Precision::Inexact(4000),
1856                column_statistics: vec![ColumnStatistics {
1857                    ..Default::default()
1858                }],
1859            },
1860            schema,
1861        ));
1862        // WHERE a = 10
1863        let predicate = Arc::new(BinaryExpr::new(
1864            Arc::new(Column::new("a", 0)),
1865            Operator::Eq,
1866            Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1867        ));
1868        let filter = FilterExec::try_new(predicate, input)?;
1869        let statistics = filter.partition_statistics(None)?;
1870        assert_eq!(statistics.num_rows, Precision::Inexact(200));
1871        assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1872        let filter = filter.with_default_selectivity(40)?;
1873        let statistics = filter.partition_statistics(None)?;
1874        assert_eq!(statistics.num_rows, Precision::Inexact(400));
1875        assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1876        Ok(())
1877    }
1878
1879    #[test]
1880    fn test_equivalence_properties_union_type() -> Result<()> {
1881        let union_type = DataType::Union(
1882            UnionFields::try_new(
1883                vec![0, 1],
1884                vec![
1885                    Field::new("f1", DataType::Int32, true),
1886                    Field::new("f2", DataType::Utf8, true),
1887                ],
1888            )
1889            .unwrap(),
1890            UnionMode::Sparse,
1891        );
1892
1893        let schema = Arc::new(Schema::new(vec![
1894            Field::new("c1", DataType::Int32, true),
1895            Field::new("c2", union_type, true),
1896        ]));
1897
1898        let exec = FilterExec::try_new(
1899            binary(
1900                binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
1901                Operator::And,
1902                binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
1903                &schema,
1904            )?,
1905            Arc::new(EmptyExec::new(Arc::clone(&schema))),
1906        )?;
1907
1908        exec.partition_statistics(None).unwrap();
1909
1910        Ok(())
1911    }
1912
1913    #[tokio::test]
1914    async fn test_builder_with_projection() -> Result<()> {
1915        // Create a schema with multiple columns
1916        let schema = Arc::new(Schema::new(vec![
1917            Field::new("a", DataType::Int32, false),
1918            Field::new("b", DataType::Int32, false),
1919            Field::new("c", DataType::Int32, false),
1920        ]));
1921
1922        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1923
1924        // Create a filter predicate: a > 10
1925        let predicate = Arc::new(BinaryExpr::new(
1926            Arc::new(Column::new("a", 0)),
1927            Operator::Gt,
1928            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1929        ));
1930
1931        // Create filter with projection [0, 2] (columns a and c) using builder
1932        let projection = Some(vec![0, 2]);
1933        let filter = FilterExecBuilder::new(predicate, input)
1934            .apply_projection(projection.clone())
1935            .unwrap()
1936            .build()?;
1937
1938        // Verify projection is set correctly
1939        assert_eq!(filter.projection(), &Some([0, 2].into()));
1940
1941        // Verify schema contains only projected columns
1942        let output_schema = filter.schema();
1943        assert_eq!(output_schema.fields().len(), 2);
1944        assert_eq!(output_schema.field(0).name(), "a");
1945        assert_eq!(output_schema.field(1).name(), "c");
1946
1947        Ok(())
1948    }
1949
1950    #[tokio::test]
1951    async fn test_builder_without_projection() -> Result<()> {
1952        let schema = Arc::new(Schema::new(vec![
1953            Field::new("a", DataType::Int32, false),
1954            Field::new("b", DataType::Int32, false),
1955        ]));
1956
1957        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1958
1959        let predicate = Arc::new(BinaryExpr::new(
1960            Arc::new(Column::new("a", 0)),
1961            Operator::Gt,
1962            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1963        ));
1964
1965        // Create filter without projection using builder
1966        let filter = FilterExecBuilder::new(predicate, input).build()?;
1967
1968        // Verify no projection is set
1969        assert!(filter.projection().is_none());
1970
1971        // Verify schema contains all columns
1972        let output_schema = filter.schema();
1973        assert_eq!(output_schema.fields().len(), 2);
1974
1975        Ok(())
1976    }
1977
1978    #[tokio::test]
1979    async fn test_builder_invalid_projection() -> Result<()> {
1980        let schema = Arc::new(Schema::new(vec![
1981            Field::new("a", DataType::Int32, false),
1982            Field::new("b", DataType::Int32, false),
1983        ]));
1984
1985        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1986
1987        let predicate = Arc::new(BinaryExpr::new(
1988            Arc::new(Column::new("a", 0)),
1989            Operator::Gt,
1990            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1991        ));
1992
1993        // Try to create filter with invalid projection (index out of bounds) using builder
1994        let result =
1995            FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5])); // 5 is out of bounds
1996
1997        // Should return an error
1998        assert!(result.is_err());
1999
2000        Ok(())
2001    }
2002
2003    #[tokio::test]
2004    async fn test_builder_vs_with_projection() -> Result<()> {
2005        // This test verifies that the builder with projection produces the same result
2006        // as try_new().with_projection(), but more efficiently (one compute_properties call)
2007        let schema = Schema::new(vec![
2008            Field::new("a", DataType::Int32, false),
2009            Field::new("b", DataType::Int32, false),
2010            Field::new("c", DataType::Int32, false),
2011            Field::new("d", DataType::Int32, false),
2012        ]);
2013
2014        let input = Arc::new(StatisticsExec::new(
2015            Statistics {
2016                num_rows: Precision::Inexact(1000),
2017                total_byte_size: Precision::Inexact(4000),
2018                column_statistics: vec![
2019                    ColumnStatistics {
2020                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2021                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2022                        ..Default::default()
2023                    },
2024                    ColumnStatistics {
2025                        ..Default::default()
2026                    },
2027                    ColumnStatistics {
2028                        ..Default::default()
2029                    },
2030                    ColumnStatistics {
2031                        ..Default::default()
2032                    },
2033                ],
2034            },
2035            schema,
2036        ));
2037        let input: Arc<dyn ExecutionPlan> = input;
2038
2039        let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2040            Arc::new(Column::new("a", 0)),
2041            Operator::Lt,
2042            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2043        ));
2044
2045        let projection = Some(vec![0, 2]);
2046
2047        // Method 1: Builder with projection (one call to compute_properties)
2048        let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
2049            .apply_projection(projection.clone())
2050            .unwrap()
2051            .build()?;
2052
2053        // Method 2: Also using builder for comparison (deprecated try_new().with_projection() removed)
2054        let filter2 = FilterExecBuilder::new(predicate, input)
2055            .apply_projection(projection)
2056            .unwrap()
2057            .build()?;
2058
2059        // Both methods should produce equivalent results
2060        assert_eq!(filter1.schema(), filter2.schema());
2061        assert_eq!(filter1.projection(), filter2.projection());
2062
2063        // Verify statistics are the same
2064        let stats1 = filter1.partition_statistics(None)?;
2065        let stats2 = filter2.partition_statistics(None)?;
2066        assert_eq!(stats1.num_rows, stats2.num_rows);
2067        assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
2068
2069        Ok(())
2070    }
2071
2072    #[tokio::test]
2073    async fn test_builder_statistics_with_projection() -> Result<()> {
2074        // Test that statistics are correctly computed when using builder with projection
2075        let schema = Schema::new(vec![
2076            Field::new("a", DataType::Int32, false),
2077            Field::new("b", DataType::Int32, false),
2078            Field::new("c", DataType::Int32, false),
2079        ]);
2080
2081        let input = Arc::new(StatisticsExec::new(
2082            Statistics {
2083                num_rows: Precision::Inexact(1000),
2084                total_byte_size: Precision::Inexact(12000),
2085                column_statistics: vec![
2086                    ColumnStatistics {
2087                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2088                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2089                        ..Default::default()
2090                    },
2091                    ColumnStatistics {
2092                        min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
2093                        max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2094                        ..Default::default()
2095                    },
2096                    ColumnStatistics {
2097                        min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
2098                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2099                        ..Default::default()
2100                    },
2101                ],
2102            },
2103            schema,
2104        ));
2105
2106        // Filter: a < 50, Project: [0, 2]
2107        let predicate = Arc::new(BinaryExpr::new(
2108            Arc::new(Column::new("a", 0)),
2109            Operator::Lt,
2110            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
2111        ));
2112
2113        let filter = FilterExecBuilder::new(predicate, input)
2114            .apply_projection(Some(vec![0, 2]))
2115            .unwrap()
2116            .build()?;
2117
2118        let statistics = filter.partition_statistics(None)?;
2119
2120        // Verify statistics reflect both filtering and projection
2121        assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
2122
2123        // Schema should only have 2 columns after projection
2124        assert_eq!(filter.schema().fields().len(), 2);
2125
2126        Ok(())
2127    }
2128
2129    #[test]
2130    fn test_builder_predicate_validation() -> Result<()> {
2131        // Test that builder validates predicate type correctly
2132        let schema = Arc::new(Schema::new(vec![
2133            Field::new("a", DataType::Int32, false),
2134            Field::new("b", DataType::Int32, false),
2135        ]));
2136
2137        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2138
2139        // Create a predicate that doesn't return boolean (returns Int32)
2140        let invalid_predicate = Arc::new(Column::new("a", 0));
2141
2142        // Should fail because predicate doesn't return boolean
2143        let result = FilterExecBuilder::new(invalid_predicate, input)
2144            .apply_projection(Some(vec![0]))
2145            .unwrap()
2146            .build();
2147
2148        assert!(result.is_err());
2149
2150        Ok(())
2151    }
2152
2153    #[tokio::test]
2154    async fn test_builder_projection_composition() -> Result<()> {
2155        // Test that calling apply_projection multiple times composes projections
2156        // If initial projection is [0, 2, 3] and we call apply_projection([0, 2]),
2157        // the result should be [0, 3] (indices 0 and 2 of [0, 2, 3])
2158        let schema = Arc::new(Schema::new(vec![
2159            Field::new("a", DataType::Int32, false),
2160            Field::new("b", DataType::Int32, false),
2161            Field::new("c", DataType::Int32, false),
2162            Field::new("d", DataType::Int32, false),
2163        ]));
2164
2165        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2166
2167        // Create a filter predicate: a > 10
2168        let predicate = Arc::new(BinaryExpr::new(
2169            Arc::new(Column::new("a", 0)),
2170            Operator::Gt,
2171            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2172        ));
2173
2174        // First projection: [0, 2, 3] -> select columns a, c, d
2175        // Second projection: [0, 2] -> select indices 0 and 2 of [0, 2, 3] -> [0, 3]
2176        // Final result: columns a and d
2177        let filter = FilterExecBuilder::new(predicate, input)
2178            .apply_projection(Some(vec![0, 2, 3]))?
2179            .apply_projection(Some(vec![0, 2]))?
2180            .build()?;
2181
2182        // Verify composed projection is [0, 3]
2183        assert_eq!(filter.projection(), &Some([0, 3].into()));
2184
2185        // Verify schema contains only columns a and d
2186        let output_schema = filter.schema();
2187        assert_eq!(output_schema.fields().len(), 2);
2188        assert_eq!(output_schema.field(0).name(), "a");
2189        assert_eq!(output_schema.field(1).name(), "d");
2190
2191        Ok(())
2192    }
2193
2194    #[tokio::test]
2195    async fn test_builder_projection_composition_none_clears() -> Result<()> {
2196        // Test that passing None clears the projection
2197        let schema = Arc::new(Schema::new(vec![
2198            Field::new("a", DataType::Int32, false),
2199            Field::new("b", DataType::Int32, false),
2200        ]));
2201
2202        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2203
2204        let predicate = Arc::new(BinaryExpr::new(
2205            Arc::new(Column::new("a", 0)),
2206            Operator::Gt,
2207            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2208        ));
2209
2210        // Set a projection then clear it with None
2211        let filter = FilterExecBuilder::new(predicate, input)
2212            .apply_projection(Some(vec![0]))?
2213            .apply_projection(None)?
2214            .build()?;
2215
2216        // Projection should be cleared
2217        assert_eq!(filter.projection(), &None);
2218
2219        // Schema should have all columns
2220        let output_schema = filter.schema();
2221        assert_eq!(output_schema.fields().len(), 2);
2222
2223        Ok(())
2224    }
2225
2226    #[test]
2227    fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
2228        // Test that FilterExec with a projection must remap parent dynamic
2229        // filter column indices from its output schema to the input schema
2230        // before passing them to the child.
2231        let input_schema = Arc::new(Schema::new(vec![
2232            Field::new("a", DataType::Int32, false),
2233            Field::new("b", DataType::Utf8, false),
2234            Field::new("c", DataType::Float64, false),
2235        ]));
2236        let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
2237
2238        // FilterExec: a > 0, projection=[c@2]
2239        let predicate = Arc::new(BinaryExpr::new(
2240            Arc::new(Column::new("a", 0)),
2241            Operator::Gt,
2242            Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
2243        ));
2244        let filter = FilterExecBuilder::new(predicate, input)
2245            .apply_projection(Some(vec![2]))?
2246            .build()?;
2247
2248        // Output schema should be [c:Float64]
2249        let output_schema = filter.schema();
2250        assert_eq!(output_schema.fields().len(), 1);
2251        assert_eq!(output_schema.field(0).name(), "c");
2252
2253        // Simulate a parent dynamic filter referencing output column c@0
2254        let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
2255
2256        let config = ConfigOptions::new();
2257        let desc = filter.gather_filters_for_pushdown(
2258            FilterPushdownPhase::Post,
2259            vec![parent_filter],
2260            &config,
2261        )?;
2262
2263        // The filter pushed to the child must reference c@2 (input schema),
2264        // not c@0 (output schema).
2265        let parent_filters = desc.parent_filters();
2266        assert_eq!(parent_filters.len(), 1); // one child
2267        assert_eq!(parent_filters[0].len(), 1); // one filter
2268        let remapped = &parent_filters[0][0].predicate;
2269        let display = format!("{remapped}");
2270        assert_eq!(
2271            display, "c@2",
2272            "Post-phase parent filter column index must be remapped \
2273             from output schema (c@0) to input schema (c@2)"
2274        );
2275
2276        Ok(())
2277    }
2278
2279    /// Regression test for https://github.com/apache/datafusion/issues/20194
2280    ///
2281    /// `collect_columns_from_predicate_inner` should only extract equality
2282    /// pairs where at least one side is a Column. Pairs like
2283    /// `complex_expr = literal` must not create equivalence classes because
2284    /// `normalize_expr`'s deep traversal would replace the literal inside
2285    /// unrelated expressions (e.g. sort keys) with the complex expression.
2286    #[test]
2287    fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
2288        let schema = test::aggr_test_schema();
2289
2290        // Simulate: nvl(c2, 0) = 0  →  (c2 IS DISTINCT FROM 0) = 0
2291        // Neither side is a Column, so this should NOT be extracted.
2292        let complex_expr: Arc<dyn PhysicalExpr> = binary(
2293            col("c2", &schema)?,
2294            Operator::IsDistinctFrom,
2295            lit(0u32),
2296            &schema,
2297        )?;
2298        let predicate: Arc<dyn PhysicalExpr> =
2299            binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
2300
2301        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2302        assert_eq!(
2303            0,
2304            equal_pairs.len(),
2305            "Should not extract equality pairs where neither side is a Column"
2306        );
2307
2308        // But col = literal should still be extracted
2309        let predicate: Arc<dyn PhysicalExpr> =
2310            binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
2311        let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2312        assert_eq!(
2313            1,
2314            equal_pairs.len(),
2315            "Should extract equality pairs where one side is a Column"
2316        );
2317
2318        Ok(())
2319    }
2320
2321    /// Columns with Absent min/max statistics should remain Absent after
2322    /// FilterExec.
2323    #[tokio::test]
2324    async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
2325        let schema = Schema::new(vec![
2326            Field::new("a", DataType::Int32, false),
2327            Field::new("b", DataType::Int32, false),
2328        ]);
2329        let input = Arc::new(StatisticsExec::new(
2330            Statistics {
2331                num_rows: Precision::Inexact(1000),
2332                total_byte_size: Precision::Absent,
2333                column_statistics: vec![
2334                    ColumnStatistics::default(),
2335                    ColumnStatistics::default(),
2336                ],
2337            },
2338            schema.clone(),
2339        ));
2340
2341        let predicate = Arc::new(BinaryExpr::new(
2342            Arc::new(Column::new("a", 0)),
2343            Operator::Eq,
2344            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2345        ));
2346        let filter: Arc<dyn ExecutionPlan> =
2347            Arc::new(FilterExec::try_new(predicate, input)?);
2348
2349        let statistics = filter.partition_statistics(None)?;
2350        let col_b_stats = &statistics.column_statistics[1];
2351        assert_eq!(col_b_stats.min_value, Precision::Absent);
2352        assert_eq!(col_b_stats.max_value, Precision::Absent);
2353
2354        Ok(())
2355    }
2356
2357    #[tokio::test]
2358    async fn test_filter_statistics_equality_ndv() -> Result<()> {
2359        #[expect(clippy::type_complexity)]
2360        let cases: Vec<(
2361            &str,
2362            Vec<Field>,
2363            Vec<ColumnStatistics>,
2364            Arc<dyn PhysicalExpr>,
2365            Vec<Precision<usize>>,
2366        )> = vec![
2367            (
2368                "utf8 equality",
2369                vec![Field::new("name", DataType::Utf8, false)],
2370                vec![ColumnStatistics {
2371                    distinct_count: Precision::Inexact(50),
2372                    ..Default::default()
2373                }],
2374                Arc::new(BinaryExpr::new(
2375                    Arc::new(Column::new("name", 0)),
2376                    Operator::Eq,
2377                    Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2378                )),
2379                vec![Precision::Exact(1)],
2380            ),
2381            (
2382                "utf8view equality",
2383                vec![Field::new("name", DataType::Utf8View, false)],
2384                vec![ColumnStatistics {
2385                    distinct_count: Precision::Inexact(50),
2386                    ..Default::default()
2387                }],
2388                Arc::new(BinaryExpr::new(
2389                    Arc::new(Column::new("name", 0)),
2390                    Operator::Eq,
2391                    Arc::new(Literal::new(ScalarValue::Utf8View(Some(
2392                        "hello".to_string(),
2393                    )))),
2394                )),
2395                vec![Precision::Exact(1)],
2396            ),
2397            (
2398                "largeutf8 equality",
2399                vec![Field::new("name", DataType::LargeUtf8, false)],
2400                vec![ColumnStatistics {
2401                    distinct_count: Precision::Inexact(50),
2402                    ..Default::default()
2403                }],
2404                Arc::new(BinaryExpr::new(
2405                    Arc::new(Column::new("name", 0)),
2406                    Operator::Eq,
2407                    Arc::new(Literal::new(ScalarValue::LargeUtf8(Some(
2408                        "hello".to_string(),
2409                    )))),
2410                )),
2411                vec![Precision::Exact(1)],
2412            ),
2413            (
2414                "utf8 reversed (literal = column)",
2415                vec![Field::new("name", DataType::Utf8, false)],
2416                vec![ColumnStatistics {
2417                    distinct_count: Precision::Inexact(50),
2418                    ..Default::default()
2419                }],
2420                Arc::new(BinaryExpr::new(
2421                    Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2422                    Operator::Eq,
2423                    Arc::new(Column::new("name", 0)),
2424                )),
2425                vec![Precision::Exact(1)],
2426            ),
2427            (
2428                "OR preserves original NDV",
2429                vec![Field::new("name", DataType::Utf8, false)],
2430                vec![ColumnStatistics {
2431                    distinct_count: Precision::Inexact(50),
2432                    ..Default::default()
2433                }],
2434                Arc::new(BinaryExpr::new(
2435                    Arc::new(BinaryExpr::new(
2436                        Arc::new(Column::new("name", 0)),
2437                        Operator::Eq,
2438                        Arc::new(Literal::new(ScalarValue::Utf8(Some("a".to_string())))),
2439                    )),
2440                    Operator::Or,
2441                    Arc::new(BinaryExpr::new(
2442                        Arc::new(Column::new("name", 0)),
2443                        Operator::Eq,
2444                        Arc::new(Literal::new(ScalarValue::Utf8(Some("b".to_string())))),
2445                    )),
2446                )),
2447                vec![Precision::Inexact(50)],
2448            ),
2449            (
2450                "AND with mixed types (Utf8 + Int32)",
2451                vec![
2452                    Field::new("name", DataType::Utf8, false),
2453                    Field::new("age", DataType::Int32, false),
2454                ],
2455                vec![
2456                    ColumnStatistics {
2457                        distinct_count: Precision::Inexact(50),
2458                        ..Default::default()
2459                    },
2460                    ColumnStatistics {
2461                        distinct_count: Precision::Inexact(80),
2462                        ..Default::default()
2463                    },
2464                ],
2465                Arc::new(BinaryExpr::new(
2466                    Arc::new(BinaryExpr::new(
2467                        Arc::new(Column::new("name", 0)),
2468                        Operator::Eq,
2469                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
2470                            "hello".to_string(),
2471                        )))),
2472                    )),
2473                    Operator::And,
2474                    Arc::new(BinaryExpr::new(
2475                        Arc::new(Column::new("age", 1)),
2476                        Operator::Eq,
2477                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2478                    )),
2479                )),
2480                vec![Precision::Exact(1), Precision::Exact(1)],
2481            ),
2482            (
2483                "numeric equality with min/max bounds (interval analysis path)",
2484                vec![Field::new("a", DataType::Int32, false)],
2485                vec![ColumnStatistics {
2486                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2487                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2488                    distinct_count: Precision::Inexact(80),
2489                    ..Default::default()
2490                }],
2491                Arc::new(BinaryExpr::new(
2492                    Arc::new(Column::new("a", 0)),
2493                    Operator::Eq,
2494                    Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2495                )),
2496                vec![Precision::Exact(1)],
2497            ),
2498            (
2499                "timestamp equality",
2500                vec![Field::new(
2501                    "ts",
2502                    DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
2503                    false,
2504                )],
2505                vec![ColumnStatistics {
2506                    distinct_count: Precision::Inexact(500),
2507                    ..Default::default()
2508                }],
2509                Arc::new(BinaryExpr::new(
2510                    Arc::new(Column::new("ts", 0)),
2511                    Operator::Eq,
2512                    Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
2513                        Some(1_609_459_200_000_000_000),
2514                        None,
2515                    ))),
2516                )),
2517                vec![Precision::Exact(1)],
2518            ),
2519            (
2520                "contradictory numeric equality (infeasible)",
2521                vec![Field::new("a", DataType::Int32, false)],
2522                vec![ColumnStatistics {
2523                    distinct_count: Precision::Inexact(50),
2524                    ..Default::default()
2525                }],
2526                Arc::new(BinaryExpr::new(
2527                    Arc::new(BinaryExpr::new(
2528                        Arc::new(Column::new("a", 0)),
2529                        Operator::Eq,
2530                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2531                    )),
2532                    Operator::And,
2533                    Arc::new(BinaryExpr::new(
2534                        Arc::new(Column::new("a", 0)),
2535                        Operator::Eq,
2536                        Arc::new(Literal::new(ScalarValue::Int32(Some(99)))),
2537                    )),
2538                )),
2539                vec![Precision::Exact(0)],
2540            ),
2541            (
2542                "utf8 equality with absent input NDV",
2543                vec![Field::new("name", DataType::Utf8, false)],
2544                vec![ColumnStatistics {
2545                    distinct_count: Precision::Absent,
2546                    ..Default::default()
2547                }],
2548                Arc::new(BinaryExpr::new(
2549                    Arc::new(Column::new("name", 0)),
2550                    Operator::Eq,
2551                    Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))),
2552                )),
2553                vec![Precision::Exact(1)],
2554            ),
2555            (
2556                "contradictory utf8 equality (infeasible)",
2557                vec![Field::new("name", DataType::Utf8, false)],
2558                vec![ColumnStatistics {
2559                    distinct_count: Precision::Inexact(100),
2560                    ..Default::default()
2561                }],
2562                Arc::new(BinaryExpr::new(
2563                    Arc::new(BinaryExpr::new(
2564                        Arc::new(Column::new("name", 0)),
2565                        Operator::Eq,
2566                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
2567                            "alice".to_string(),
2568                        )))),
2569                    )),
2570                    Operator::And,
2571                    Arc::new(BinaryExpr::new(
2572                        Arc::new(Column::new("name", 0)),
2573                        Operator::Eq,
2574                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
2575                            "bob".to_string(),
2576                        )))),
2577                    )),
2578                )),
2579                vec![Precision::Exact(0)],
2580            ),
2581            (
2582                "redundant same-value equality combined with another column",
2583                vec![
2584                    Field::new("a", DataType::Int32, false),
2585                    Field::new("b", DataType::Int32, false),
2586                ],
2587                vec![
2588                    ColumnStatistics {
2589                        distinct_count: Precision::Inexact(80),
2590                        ..Default::default()
2591                    },
2592                    ColumnStatistics {
2593                        distinct_count: Precision::Inexact(40),
2594                        ..Default::default()
2595                    },
2596                ],
2597                Arc::new(BinaryExpr::new(
2598                    Arc::new(BinaryExpr::new(
2599                        Arc::new(BinaryExpr::new(
2600                            Arc::new(Column::new("a", 0)),
2601                            Operator::Eq,
2602                            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2603                        )),
2604                        Operator::And,
2605                        Arc::new(BinaryExpr::new(
2606                            Arc::new(Column::new("a", 0)),
2607                            Operator::Eq,
2608                            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2609                        )),
2610                    )),
2611                    Operator::And,
2612                    Arc::new(BinaryExpr::new(
2613                        Arc::new(Column::new("b", 1)),
2614                        Operator::Eq,
2615                        Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
2616                    )),
2617                )),
2618                vec![Precision::Exact(1), Precision::Exact(1)],
2619            ),
2620        ];
2621
2622        for (desc, fields, col_stats, predicate, expected_ndvs) in cases {
2623            let schema = Schema::new(fields);
2624            let input = Arc::new(StatisticsExec::new(
2625                Statistics {
2626                    num_rows: Precision::Inexact(100),
2627                    total_byte_size: Precision::Inexact(1000),
2628                    column_statistics: col_stats,
2629                },
2630                schema.clone(),
2631            ));
2632            let filter: Arc<dyn ExecutionPlan> =
2633                Arc::new(FilterExec::try_new(predicate, input)?);
2634            let statistics = filter.partition_statistics(None)?;
2635
2636            for (i, expected) in expected_ndvs.iter().enumerate() {
2637                assert_eq!(
2638                    statistics.column_statistics[i].distinct_count, *expected,
2639                    "case '{desc}': column {i} NDV mismatch"
2640                );
2641            }
2642        }
2643        Ok(())
2644    }
2645
2646    #[tokio::test]
2647    async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
2648        // a: min=1, max=100, ndv=80
2649        // b: min=1, max=50, ndv=40
2650        // c: min=1, max=200, ndv=150
2651        let schema = Schema::new(vec![
2652            Field::new("a", DataType::Int32, false),
2653            Field::new("b", DataType::Int32, false),
2654            Field::new("c", DataType::Int32, false),
2655        ]);
2656        let input = Arc::new(StatisticsExec::new(
2657            Statistics {
2658                num_rows: Precision::Inexact(100),
2659                total_byte_size: Precision::Inexact(1200),
2660                column_statistics: vec![
2661                    ColumnStatistics {
2662                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2663                        max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2664                        distinct_count: Precision::Inexact(80),
2665                        ..Default::default()
2666                    },
2667                    ColumnStatistics {
2668                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2669                        max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2670                        distinct_count: Precision::Inexact(40),
2671                        ..Default::default()
2672                    },
2673                    ColumnStatistics {
2674                        min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2675                        max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2676                        distinct_count: Precision::Inexact(150),
2677                        ..Default::default()
2678                    },
2679                ],
2680            },
2681            schema.clone(),
2682        ));
2683
2684        // a = 42 AND b > 10 AND c = 7
2685        let predicate = Arc::new(BinaryExpr::new(
2686            Arc::new(BinaryExpr::new(
2687                Arc::new(BinaryExpr::new(
2688                    Arc::new(Column::new("a", 0)),
2689                    Operator::Eq,
2690                    Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2691                )),
2692                Operator::And,
2693                Arc::new(BinaryExpr::new(
2694                    Arc::new(Column::new("b", 1)),
2695                    Operator::Gt,
2696                    Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2697                )),
2698            )),
2699            Operator::And,
2700            Arc::new(BinaryExpr::new(
2701                Arc::new(Column::new("c", 2)),
2702                Operator::Eq,
2703                Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
2704            )),
2705        ));
2706        let filter: Arc<dyn ExecutionPlan> =
2707            Arc::new(FilterExec::try_new(predicate, input)?);
2708        let statistics = filter.partition_statistics(None)?;
2709        // a = 42 collapses to single value
2710        assert_eq!(
2711            statistics.column_statistics[0].distinct_count,
2712            Precision::Exact(1)
2713        );
2714        // b > 10 narrows to [11, 50] but doesn't collapse to a single value.
2715        // The combined selectivity of a=42 (1/80) and c=7 (1/150) on 100 rows
2716        // computes num_rows = 1, so NDV is capped at the row count: min(40, 1) = 1.
2717        assert_eq!(
2718            statistics.column_statistics[1].distinct_count,
2719            Precision::Inexact(1)
2720        );
2721        // c = 7 collapses to single value
2722        assert_eq!(
2723            statistics.column_statistics[2].distinct_count,
2724            Precision::Exact(1)
2725        );
2726        Ok(())
2727    }
2728
2729    #[tokio::test]
2730    async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
2731        // a: ndv=80, no min/max
2732        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2733        let input = Arc::new(StatisticsExec::new(
2734            Statistics {
2735                num_rows: Precision::Inexact(100),
2736                total_byte_size: Precision::Inexact(400),
2737                column_statistics: vec![ColumnStatistics {
2738                    distinct_count: Precision::Inexact(80),
2739                    ..Default::default()
2740                }],
2741            },
2742            schema.clone(),
2743        ));
2744
2745        // a = 42: even without known bounds, interval analysis resolves
2746        // the equality to [42, 42], so NDV is correctly set to Exact(1)
2747        let predicate = Arc::new(BinaryExpr::new(
2748            Arc::new(Column::new("a", 0)),
2749            Operator::Eq,
2750            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2751        ));
2752        let filter: Arc<dyn ExecutionPlan> =
2753            Arc::new(FilterExec::try_new(predicate, input)?);
2754        let statistics = filter.partition_statistics(None)?;
2755        assert_eq!(
2756            statistics.column_statistics[0].distinct_count,
2757            Precision::Exact(1)
2758        );
2759        Ok(())
2760    }
2761
2762    #[tokio::test]
2763    async fn test_filter_statistics_equality_int8_ndv() -> Result<()> {
2764        // a: min=-100, max=100, ndv=50
2765        let schema = Schema::new(vec![Field::new("a", DataType::Int8, false)]);
2766        let input = Arc::new(StatisticsExec::new(
2767            Statistics {
2768                num_rows: Precision::Inexact(100),
2769                total_byte_size: Precision::Inexact(100),
2770                column_statistics: vec![ColumnStatistics {
2771                    min_value: Precision::Inexact(ScalarValue::Int8(Some(-100))),
2772                    max_value: Precision::Inexact(ScalarValue::Int8(Some(100))),
2773                    distinct_count: Precision::Inexact(50),
2774                    ..Default::default()
2775                }],
2776            },
2777            schema.clone(),
2778        ));
2779
2780        let predicate = Arc::new(BinaryExpr::new(
2781            Arc::new(Column::new("a", 0)),
2782            Operator::Eq,
2783            Arc::new(Literal::new(ScalarValue::Int8(Some(42)))),
2784        ));
2785        let filter: Arc<dyn ExecutionPlan> =
2786            Arc::new(FilterExec::try_new(predicate, input)?);
2787        let statistics = filter.partition_statistics(None)?;
2788        assert_eq!(
2789            statistics.column_statistics[0].distinct_count,
2790            Precision::Exact(1)
2791        );
2792        Ok(())
2793    }
2794
2795    #[tokio::test]
2796    async fn test_filter_statistics_equality_int64_ndv() -> Result<()> {
2797        // a: min=0, max=1_000_000, ndv=100_000
2798        let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
2799        let input = Arc::new(StatisticsExec::new(
2800            Statistics {
2801                num_rows: Precision::Inexact(100_000),
2802                total_byte_size: Precision::Inexact(800_000),
2803                column_statistics: vec![ColumnStatistics {
2804                    min_value: Precision::Inexact(ScalarValue::Int64(Some(0))),
2805                    max_value: Precision::Inexact(ScalarValue::Int64(Some(1_000_000))),
2806                    distinct_count: Precision::Inexact(100_000),
2807                    ..Default::default()
2808                }],
2809            },
2810            schema.clone(),
2811        ));
2812
2813        let predicate = Arc::new(BinaryExpr::new(
2814            Arc::new(Column::new("a", 0)),
2815            Operator::Eq,
2816            Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2817        ));
2818        let filter: Arc<dyn ExecutionPlan> =
2819            Arc::new(FilterExec::try_new(predicate, input)?);
2820        let statistics = filter.partition_statistics(None)?;
2821        assert_eq!(
2822            statistics.column_statistics[0].distinct_count,
2823            Precision::Exact(1)
2824        );
2825        Ok(())
2826    }
2827
2828    #[tokio::test]
2829    async fn test_filter_statistics_equality_float32_ndv() -> Result<()> {
2830        // a: min=0.0, max=100.0, ndv=50
2831        let schema = Schema::new(vec![Field::new("a", DataType::Float32, false)]);
2832        let input = Arc::new(StatisticsExec::new(
2833            Statistics {
2834                num_rows: Precision::Inexact(100),
2835                total_byte_size: Precision::Inexact(400),
2836                column_statistics: vec![ColumnStatistics {
2837                    min_value: Precision::Inexact(ScalarValue::Float32(Some(0.0))),
2838                    max_value: Precision::Inexact(ScalarValue::Float32(Some(100.0))),
2839                    distinct_count: Precision::Inexact(50),
2840                    ..Default::default()
2841                }],
2842            },
2843            schema.clone(),
2844        ));
2845
2846        let predicate = Arc::new(BinaryExpr::new(
2847            Arc::new(Column::new("a", 0)),
2848            Operator::Eq,
2849            Arc::new(Literal::new(ScalarValue::Float32(Some(42.5)))),
2850        ));
2851        let filter: Arc<dyn ExecutionPlan> =
2852            Arc::new(FilterExec::try_new(predicate, input)?);
2853        let statistics = filter.partition_statistics(None)?;
2854        assert_eq!(
2855            statistics.column_statistics[0].distinct_count,
2856            Precision::Exact(1)
2857        );
2858        Ok(())
2859    }
2860
2861    #[tokio::test]
2862    async fn test_filter_statistics_equality_reversed_ndv() -> Result<()> {
2863        // a: min=1, max=100, ndv=80
2864        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2865        let input = Arc::new(StatisticsExec::new(
2866            Statistics {
2867                num_rows: Precision::Inexact(100),
2868                total_byte_size: Precision::Inexact(400),
2869                column_statistics: vec![ColumnStatistics {
2870                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2871                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2872                    distinct_count: Precision::Inexact(80),
2873                    ..Default::default()
2874                }],
2875            },
2876            schema.clone(),
2877        ));
2878
2879        // 42 = a (literal on the left)
2880        let predicate = Arc::new(BinaryExpr::new(
2881            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2882            Operator::Eq,
2883            Arc::new(Column::new("a", 0)),
2884        ));
2885        let filter: Arc<dyn ExecutionPlan> =
2886            Arc::new(FilterExec::try_new(predicate, input)?);
2887        let statistics = filter.partition_statistics(None)?;
2888        assert_eq!(
2889            statistics.column_statistics[0].distinct_count,
2890            Precision::Exact(1)
2891        );
2892        Ok(())
2893    }
2894
2895    #[tokio::test]
2896    async fn test_filter_statistics_equality_timestamp_ndv() -> Result<()> {
2897        // ts: min=1_000_000_000, max=2_000_000_000, ndv=500
2898        let schema = Schema::new(vec![Field::new(
2899            "ts",
2900            DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
2901            false,
2902        )]);
2903        let input = Arc::new(StatisticsExec::new(
2904            Statistics {
2905                num_rows: Precision::Inexact(1000),
2906                total_byte_size: Precision::Inexact(8000),
2907                column_statistics: vec![ColumnStatistics {
2908                    min_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2909                        Some(1_000_000_000),
2910                        None,
2911                    )),
2912                    max_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2913                        Some(2_000_000_000),
2914                        None,
2915                    )),
2916                    distinct_count: Precision::Inexact(500),
2917                    ..Default::default()
2918                }],
2919            },
2920            schema.clone(),
2921        ));
2922
2923        let predicate = Arc::new(BinaryExpr::new(
2924            Arc::new(Column::new("ts", 0)),
2925            Operator::Eq,
2926            Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
2927                Some(1_500_000_000),
2928                None,
2929            ))),
2930        ));
2931        let filter: Arc<dyn ExecutionPlan> =
2932            Arc::new(FilterExec::try_new(predicate, input)?);
2933        let statistics = filter.partition_statistics(None)?;
2934        assert_eq!(
2935            statistics.column_statistics[0].distinct_count,
2936            Precision::Exact(1)
2937        );
2938        Ok(())
2939    }
2940
2941    #[test]
2942    fn test_collect_equality_columns() {
2943        use std::collections::HashSet;
2944        // (description, predicate, expected_column_indices, expected_infeasible)
2945        #[expect(clippy::type_complexity)]
2946        let cases: Vec<(&str, Arc<dyn PhysicalExpr>, Vec<usize>, bool)> = vec![
2947            (
2948                "simple col = literal",
2949                Arc::new(BinaryExpr::new(
2950                    Arc::new(Column::new("a", 0)),
2951                    Operator::Eq,
2952                    Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2953                )),
2954                vec![0],
2955                false,
2956            ),
2957            (
2958                "reversed literal = col",
2959                Arc::new(BinaryExpr::new(
2960                    Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2961                    Operator::Eq,
2962                    Arc::new(Column::new("a", 0)),
2963                )),
2964                vec![0],
2965                false,
2966            ),
2967            (
2968                "AND with two equalities",
2969                Arc::new(BinaryExpr::new(
2970                    Arc::new(BinaryExpr::new(
2971                        Arc::new(Column::new("a", 0)),
2972                        Operator::Eq,
2973                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2974                    )),
2975                    Operator::And,
2976                    Arc::new(BinaryExpr::new(
2977                        Arc::new(Column::new("b", 1)),
2978                        Operator::Eq,
2979                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
2980                            "hello".to_string(),
2981                        )))),
2982                    )),
2983                )),
2984                vec![0, 1],
2985                false,
2986            ),
2987            (
2988                "OR produces empty set",
2989                Arc::new(BinaryExpr::new(
2990                    Arc::new(BinaryExpr::new(
2991                        Arc::new(Column::new("a", 0)),
2992                        Operator::Eq,
2993                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2994                    )),
2995                    Operator::Or,
2996                    Arc::new(BinaryExpr::new(
2997                        Arc::new(Column::new("a", 0)),
2998                        Operator::Eq,
2999                        Arc::new(Literal::new(ScalarValue::Int32(Some(99)))),
3000                    )),
3001                )),
3002                vec![],
3003                false,
3004            ),
3005            (
3006                "greater-than produces empty set",
3007                Arc::new(BinaryExpr::new(
3008                    Arc::new(Column::new("a", 0)),
3009                    Operator::Gt,
3010                    Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3011                )),
3012                vec![],
3013                false,
3014            ),
3015            (
3016                "col = col produces empty set",
3017                Arc::new(BinaryExpr::new(
3018                    Arc::new(Column::new("a", 0)),
3019                    Operator::Eq,
3020                    Arc::new(Column::new("b", 1)),
3021                )),
3022                vec![],
3023                false,
3024            ),
3025            (
3026                "nested AND with three equalities",
3027                Arc::new(BinaryExpr::new(
3028                    Arc::new(BinaryExpr::new(
3029                        Arc::new(BinaryExpr::new(
3030                            Arc::new(Column::new("a", 0)),
3031                            Operator::Eq,
3032                            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
3033                        )),
3034                        Operator::And,
3035                        Arc::new(BinaryExpr::new(
3036                            Arc::new(Column::new("b", 1)),
3037                            Operator::Eq,
3038                            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
3039                        )),
3040                    )),
3041                    Operator::And,
3042                    Arc::new(BinaryExpr::new(
3043                        Arc::new(Column::new("c", 2)),
3044                        Operator::Eq,
3045                        Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
3046                    )),
3047                )),
3048                vec![0, 1, 2],
3049                false,
3050            ),
3051            (
3052                "AND with mixed equality and non-equality",
3053                Arc::new(BinaryExpr::new(
3054                    Arc::new(BinaryExpr::new(
3055                        Arc::new(Column::new("a", 0)),
3056                        Operator::Eq,
3057                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3058                    )),
3059                    Operator::And,
3060                    Arc::new(BinaryExpr::new(
3061                        Arc::new(Column::new("b", 1)),
3062                        Operator::Gt,
3063                        Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
3064                    )),
3065                )),
3066                vec![0],
3067                false,
3068            ),
3069            (
3070                "col = NULL is excluded",
3071                Arc::new(BinaryExpr::new(
3072                    Arc::new(Column::new("a", 0)),
3073                    Operator::Eq,
3074                    Arc::new(Literal::new(ScalarValue::Int32(None))),
3075                )),
3076                vec![],
3077                false,
3078            ),
3079            (
3080                "NULL = col is excluded",
3081                Arc::new(BinaryExpr::new(
3082                    Arc::new(Literal::new(ScalarValue::Utf8(None))),
3083                    Operator::Eq,
3084                    Arc::new(Column::new("a", 0)),
3085                )),
3086                vec![],
3087                false,
3088            ),
3089            (
3090                "contradictory: same col, different literals",
3091                Arc::new(BinaryExpr::new(
3092                    Arc::new(BinaryExpr::new(
3093                        Arc::new(Column::new("a", 0)),
3094                        Operator::Eq,
3095                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
3096                            "alice".to_string(),
3097                        )))),
3098                    )),
3099                    Operator::And,
3100                    Arc::new(BinaryExpr::new(
3101                        Arc::new(Column::new("a", 0)),
3102                        Operator::Eq,
3103                        Arc::new(Literal::new(ScalarValue::Utf8(Some(
3104                            "bob".to_string(),
3105                        )))),
3106                    )),
3107                )),
3108                vec![0],
3109                true,
3110            ),
3111            (
3112                "same col, same literal is not contradictory",
3113                Arc::new(BinaryExpr::new(
3114                    Arc::new(BinaryExpr::new(
3115                        Arc::new(Column::new("a", 0)),
3116                        Operator::Eq,
3117                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3118                    )),
3119                    Operator::And,
3120                    Arc::new(BinaryExpr::new(
3121                        Arc::new(Column::new("a", 0)),
3122                        Operator::Eq,
3123                        Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
3124                    )),
3125                )),
3126                vec![0],
3127                false,
3128            ),
3129        ];
3130
3131        for (desc, expr, expected_cols, expected_infeasible) in cases {
3132            let (result, infeasible) = collect_equality_columns(&expr);
3133            let expected: HashSet<usize> = expected_cols.into_iter().collect();
3134            if expected_infeasible {
3135                // When infeasible, the scan is short-circuited, so we only
3136                // assert the infeasibility flag — the partial column set
3137                // contents are an implementation detail.
3138                assert!(infeasible, "case '{desc}': expected infeasible");
3139            } else {
3140                assert_eq!(result, expected, "case '{desc}': columns mismatch");
3141                assert!(!infeasible, "case '{desc}': expected feasible");
3142            }
3143        }
3144    }
3145
3146    /// Regression test: ProjectionExec on top of a FilterExec that already has
3147    /// an explicit projection must not panic when `try_swapping_with_projection`
3148    /// attempts to swap the two nodes.
3149    ///
3150    /// Before the fix, `FilterExecBuilder::from(self)` copied the old projection
3151    /// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the
3152    /// input with the narrower ProjectionExec (2 columns), `.build()` tried to
3153    /// validate the stale `[0, 1, 2]` projection against the 2-column schema and
3154    /// panicked with "project index 2 out of bounds, max field 2".
3155    #[test]
3156    fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
3157        use crate::projection::ProjectionExpr;
3158        use datafusion_physical_expr::expressions::col;
3159
3160        // Schema: [ts: Int64, tokens: Int64, svc: Utf8]
3161        let schema = Arc::new(Schema::new(vec![
3162            Field::new("ts", DataType::Int64, false),
3163            Field::new("tokens", DataType::Int64, false),
3164            Field::new("svc", DataType::Utf8, false),
3165        ]));
3166        let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3167
3168        // FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols)
3169        let predicate = Arc::new(BinaryExpr::new(
3170            Arc::new(Column::new("ts", 0)),
3171            Operator::Gt,
3172            Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
3173        ));
3174        let filter = Arc::new(
3175            FilterExecBuilder::new(predicate, input)
3176                .apply_projection(Some(vec![0, 1, 2]))?
3177                .build()?,
3178        );
3179
3180        // ProjectionExec: narrows to [ts, tokens] (drops svc)
3181        let proj_exprs = vec![
3182            ProjectionExpr {
3183                expr: col("ts", &filter.schema())?,
3184                alias: "ts".to_string(),
3185            },
3186            ProjectionExpr {
3187                expr: col("tokens", &filter.schema())?,
3188                alias: "tokens".to_string(),
3189            },
3190        ];
3191        let projection = Arc::new(ProjectionExec::try_new(
3192            proj_exprs,
3193            Arc::clone(&filter) as _,
3194        )?);
3195
3196        // This must not panic
3197        let result = filter.try_swapping_with_projection(&projection)?;
3198        assert!(result.is_some(), "swap should succeed");
3199
3200        let new_plan = result.unwrap();
3201        // Output schema must still be [ts, tokens]
3202        let out_schema = new_plan.schema();
3203        assert_eq!(out_schema.fields().len(), 2);
3204        assert_eq!(out_schema.field(0).name(), "ts");
3205        assert_eq!(out_schema.field(1).name(), "tokens");
3206        Ok(())
3207    }
3208
3209    #[tokio::test]
3210    async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> {
3211        // Table: a: min=1, max=100, distinct_count=80, 100 rows
3212        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3213        let input = Arc::new(StatisticsExec::new(
3214            Statistics {
3215                num_rows: Precision::Inexact(100),
3216                total_byte_size: Precision::Inexact(400),
3217                column_statistics: vec![ColumnStatistics {
3218                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
3219                    max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
3220                    distinct_count: Precision::Inexact(80),
3221                    ..Default::default()
3222                }],
3223            },
3224            schema.clone(),
3225        ));
3226
3227        // a <= 10 => ~10 rows out of 100
3228        let predicate: Arc<dyn PhysicalExpr> =
3229            binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?;
3230
3231        let filter: Arc<dyn ExecutionPlan> =
3232            Arc::new(FilterExec::try_new(predicate, input)?);
3233
3234        let statistics = filter.partition_statistics(None)?;
3235        // Filter estimates ~10 rows (selectivity = 10/100)
3236        assert_eq!(statistics.num_rows, Precision::Inexact(10));
3237        // NDV should be capped at the filtered row count (10), not the original 80
3238        let ndv = &statistics.column_statistics[0].distinct_count;
3239        assert!(
3240            ndv.get_value().copied() <= Some(10),
3241            "Expected NDV <= 10 (filtered row count), got {ndv:?}"
3242        );
3243        Ok(())
3244    }
3245}