Skip to main content

datafusion_physical_plan/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the projection execution plan. A projection determines which columns or expressions
19//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
20//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
21//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
22
23use super::expressions::Column;
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27    SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::column_rewriter::PhysicalColumnRewriter;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33    FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
34};
35use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
36use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::RecordBatch;
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{
46    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
47};
48use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
49use datafusion_execution::TaskContext;
50use datafusion_expr::ExpressionPlacement;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::projection::Projector;
53use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
54use datafusion_physical_expr_common::sort_expr::{
55    LexOrdering, LexRequirement, PhysicalSortExpr,
56};
57// Re-exported from datafusion-physical-expr for backwards compatibility
58// We recommend updating your imports to use datafusion-physical-expr directly
59pub use datafusion_physical_expr::projection::{
60    ProjectionExpr, ProjectionExprs, update_expr,
61};
62
63use futures::stream::{Stream, StreamExt};
64use log::trace;
65
66/// [`ExecutionPlan`] for a projection
67///
68/// Computes a set of scalar value expressions for each input row, producing one
69/// output row for each input row.
70#[derive(Debug, Clone)]
71pub struct ProjectionExec {
72    /// A projector specialized to apply the projection to the input schema from the child node
73    /// and produce [`RecordBatch`]es with the output schema of this node.
74    projector: Projector,
75    /// The input plan
76    input: Arc<dyn ExecutionPlan>,
77    /// Execution metrics
78    metrics: ExecutionPlanMetricsSet,
79    /// Cache holding plan properties like equivalences, output partitioning etc.
80    cache: Arc<PlanProperties>,
81}
82
83impl ProjectionExec {
84    /// Create a projection on an input
85    ///
86    /// # Example:
87    /// Create a `ProjectionExec` to crate `SELECT a, a+b AS sum_ab FROM t1`:
88    ///
89    /// ```
90    /// # use std::sync::Arc;
91    /// # use arrow_schema::{Schema, Field, DataType};
92    /// # use datafusion_expr::Operator;
93    /// # use datafusion_physical_plan::ExecutionPlan;
94    /// # use datafusion_physical_expr::expressions::{col, binary};
95    /// # use datafusion_physical_plan::empty::EmptyExec;
96    /// # use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
97    /// # fn schema() -> Arc<Schema> {
98    /// #  Arc::new(Schema::new(vec![
99    /// #   Field::new("a", DataType::Int32, false),
100    /// #   Field::new("b", DataType::Int32, false),
101    /// # ]))
102    /// # }
103    /// #
104    /// # fn input() -> Arc<dyn ExecutionPlan> {
105    /// #  Arc::new(EmptyExec::new(schema()))
106    /// # }
107    /// #
108    /// # fn main() {
109    /// let schema = schema();
110    /// // Create PhysicalExprs
111    /// let a = col("a", &schema).unwrap();
112    /// let b = col("b", &schema).unwrap();
113    /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, &schema).unwrap();
114    /// // create ProjectionExec
115    /// let proj = ProjectionExec::try_new(
116    ///     [
117    ///         ProjectionExpr {
118    ///             // expr a produces the column named "a"
119    ///             expr: a,
120    ///             alias: "a".to_string(),
121    ///         },
122    ///         ProjectionExpr {
123    ///             // expr: a + b produces the column named "sum_ab"
124    ///             expr: a_plus_b,
125    ///             alias: "sum_ab".to_string(),
126    ///         },
127    ///     ],
128    ///     input(),
129    /// )
130    /// .unwrap();
131    /// # }
132    /// ```
133    pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
134    where
135        I: IntoIterator<Item = E>,
136        E: Into<ProjectionExpr>,
137    {
138        let input_schema = input.schema();
139        let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>();
140        let projection = ProjectionExprs::from_expressions(expr_arc);
141        let projector = projection.make_projector(&input_schema)?;
142        Self::try_from_projector(projector, input)
143    }
144
145    fn try_from_projector(
146        projector: Projector,
147        input: Arc<dyn ExecutionPlan>,
148    ) -> Result<Self> {
149        // Construct a map from the input expressions to the output expression of the Projection
150        let projection_mapping =
151            projector.projection().projection_mapping(&input.schema())?;
152        let cache = Self::compute_properties(
153            &input,
154            &projection_mapping,
155            Arc::clone(projector.output_schema()),
156        )?;
157        Ok(Self {
158            projector,
159            input,
160            metrics: ExecutionPlanMetricsSet::new(),
161            cache: Arc::new(cache),
162        })
163    }
164
165    /// The projection expressions stored as tuples of (expression, output column name)
166    pub fn expr(&self) -> &[ProjectionExpr] {
167        self.projector.projection().as_ref()
168    }
169
170    /// The projection expressions as a [`ProjectionExprs`].
171    pub fn projection_expr(&self) -> &ProjectionExprs {
172        self.projector.projection()
173    }
174
175    /// The input plan
176    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
177        &self.input
178    }
179
180    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
181    fn compute_properties(
182        input: &Arc<dyn ExecutionPlan>,
183        projection_mapping: &ProjectionMapping,
184        schema: SchemaRef,
185    ) -> Result<PlanProperties> {
186        // Calculate equivalence properties:
187        let input_eq_properties = input.equivalence_properties();
188        let eq_properties = input_eq_properties.project(projection_mapping, schema);
189        // Calculate output partitioning, which needs to respect aliases:
190        let output_partitioning = input
191            .output_partitioning()
192            .project(projection_mapping, input_eq_properties);
193
194        Ok(PlanProperties::new(
195            eq_properties,
196            output_partitioning,
197            input.pipeline_behavior(),
198            input.boundedness(),
199        ))
200    }
201
202    /// Collect reverse alias mapping from projection expressions.
203    /// The result hash map is a map from aliased Column in parent to original expr.
204    fn collect_reverse_alias(
205        &self,
206    ) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
207        let mut alias_map = datafusion_common::HashMap::new();
208        for projection in self.projection_expr().iter() {
209            let (aliased_index, _output_field) = self
210                .projector
211                .output_schema()
212                .column_with_name(&projection.alias)
213                .ok_or_else(|| {
214                    DataFusionError::Internal(format!(
215                        "Expr {} with alias {} not found in output schema",
216                        projection.expr, projection.alias
217                    ))
218                })?;
219            let aliased_col = Column::new(&projection.alias, aliased_index);
220            alias_map.insert(aliased_col, Arc::clone(&projection.expr));
221        }
222        Ok(alias_map)
223    }
224
225    fn with_new_children_and_same_properties(
226        &self,
227        mut children: Vec<Arc<dyn ExecutionPlan>>,
228    ) -> Self {
229        Self {
230            input: children.swap_remove(0),
231            metrics: ExecutionPlanMetricsSet::new(),
232            ..Self::clone(self)
233        }
234    }
235}
236
237impl DisplayAs for ProjectionExec {
238    fn fmt_as(
239        &self,
240        t: DisplayFormatType,
241        f: &mut std::fmt::Formatter,
242    ) -> std::fmt::Result {
243        match t {
244            DisplayFormatType::Default | DisplayFormatType::Verbose => {
245                let expr: Vec<String> = self
246                    .projector
247                    .projection()
248                    .as_ref()
249                    .iter()
250                    .map(|proj_expr| {
251                        let e = proj_expr.expr.to_string();
252                        if e != proj_expr.alias {
253                            format!("{e} as {}", proj_expr.alias)
254                        } else {
255                            e
256                        }
257                    })
258                    .collect();
259
260                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
261            }
262            DisplayFormatType::TreeRender => {
263                for (i, proj_expr) in self.expr().iter().enumerate() {
264                    let expr_sql = fmt_sql(proj_expr.expr.as_ref());
265                    if proj_expr.expr.to_string() == proj_expr.alias {
266                        writeln!(f, "expr{i}={expr_sql}")?;
267                    } else {
268                        writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
269                    }
270                }
271
272                Ok(())
273            }
274        }
275    }
276}
277
278impl ExecutionPlan for ProjectionExec {
279    fn name(&self) -> &'static str {
280        "ProjectionExec"
281    }
282
283    /// Return a reference to Any that can be used for downcasting
284    fn properties(&self) -> &Arc<PlanProperties> {
285        &self.cache
286    }
287
288    fn maintains_input_order(&self) -> Vec<bool> {
289        // Tell optimizer this operator doesn't reorder its input
290        vec![true]
291    }
292
293    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
294        let all_simple_exprs =
295            self.projector
296                .projection()
297                .as_ref()
298                .iter()
299                .all(|proj_expr| {
300                    !matches!(
301                        proj_expr.expr.placement(),
302                        ExpressionPlacement::KeepInPlace
303                    )
304                });
305        // If expressions are all either column_expr or Literal (or other cheap expressions),
306        // then all computations in this projection are reorder or rename,
307        // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
308        vec![!all_simple_exprs]
309    }
310
311    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
312        vec![&self.input]
313    }
314
315    fn with_new_children(
316        self: Arc<Self>,
317        mut children: Vec<Arc<dyn ExecutionPlan>>,
318    ) -> Result<Arc<dyn ExecutionPlan>> {
319        check_if_same_properties!(self, children);
320        ProjectionExec::try_from_projector(
321            self.projector.clone(),
322            children.swap_remove(0),
323        )
324        .map(|p| Arc::new(p) as _)
325    }
326
327    fn execute(
328        &self,
329        partition: usize,
330        context: Arc<TaskContext>,
331    ) -> Result<SendableRecordBatchStream> {
332        trace!(
333            "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
334            partition,
335            context.session_id(),
336            context.task_id()
337        );
338
339        let projector = self.projector.with_metrics(&self.metrics, partition);
340        Ok(Box::pin(ProjectionStream::new(
341            projector,
342            self.input.execute(partition, context)?,
343            BaselineMetrics::new(&self.metrics, partition),
344        )?))
345    }
346
347    fn metrics(&self) -> Option<MetricsSet> {
348        Some(self.metrics.clone_inner())
349    }
350
351    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
352        let input_stats =
353            Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
354        let output_schema = self.schema();
355        Ok(Arc::new(
356            self.projector
357                .projection()
358                .project_statistics(input_stats, &output_schema)?,
359        ))
360    }
361
362    fn supports_limit_pushdown(&self) -> bool {
363        true
364    }
365
366    fn cardinality_effect(&self) -> CardinalityEffect {
367        CardinalityEffect::Equal
368    }
369
370    fn try_swapping_with_projection(
371        &self,
372        projection: &ProjectionExec,
373    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
374        let maybe_unified = try_unifying_projections(projection, self)?;
375        if let Some(new_plan) = maybe_unified {
376            // To unify 3 or more sequential projections:
377            remove_unnecessary_projections(new_plan).data().map(Some)
378        } else {
379            Ok(Some(Arc::new(projection.clone())))
380        }
381    }
382
383    fn gather_filters_for_pushdown(
384        &self,
385        _phase: FilterPushdownPhase,
386        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
387        _config: &ConfigOptions,
388    ) -> Result<FilterDescription> {
389        // expand alias column to original expr in parent filters
390        let invert_alias_map = self.collect_reverse_alias()?;
391        let output_schema = self.schema();
392        let remapper = FilterRemapper::new(output_schema);
393        let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
394
395        for filter in parent_filters {
396            // Check that column exists in child, then reassign column indices to match child schema
397            if let Some(reassigned) = remapper.try_remap(&filter)? {
398                // rewrite filter expression using invert alias map
399                let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
400                let rewritten = reassigned.rewrite(&mut rewriter)?.data;
401                child_parent_filters.push(PushedDownPredicate::supported(rewritten));
402            } else {
403                child_parent_filters.push(PushedDownPredicate::unsupported(filter));
404            }
405        }
406
407        Ok(FilterDescription::new().with_child(ChildFilterDescription {
408            parent_filters: child_parent_filters,
409            self_filters: vec![],
410        }))
411    }
412
413    fn handle_child_pushdown_result(
414        &self,
415        _phase: FilterPushdownPhase,
416        child_pushdown_result: ChildPushdownResult,
417        _config: &ConfigOptions,
418    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
419        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
420    }
421
422    fn try_pushdown_sort(
423        &self,
424        order: &[PhysicalSortExpr],
425    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
426        let child = self.input();
427        let mut child_order = Vec::new();
428
429        // Check and transform sort expressions
430        for sort_expr in order {
431            // Recursively transform the expression
432            let mut can_pushdown = true;
433            let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
434                if let Some(col) = expr.downcast_ref::<Column>() {
435                    // Check if column index is valid.
436                    // This should always be true but fail gracefully if it's not.
437                    if col.index() >= self.expr().len() {
438                        can_pushdown = false;
439                        return Ok(Transformed::no(expr));
440                    }
441
442                    let proj_expr = &self.expr()[col.index()];
443
444                    // Check if projection expression is a simple column
445                    // We cannot push down order by clauses that depend on
446                    // projected computations as they would have nothing to reference.
447                    if let Some(child_col) = proj_expr.expr.downcast_ref::<Column>() {
448                        // Replace with the child column
449                        Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
450                    } else {
451                        // Projection involves computation, cannot push down
452                        can_pushdown = false;
453                        Ok(Transformed::no(expr))
454                    }
455                } else {
456                    Ok(Transformed::no(expr))
457                }
458            })?;
459
460            if !can_pushdown {
461                return Ok(SortOrderPushdownResult::Unsupported);
462            }
463
464            child_order.push(PhysicalSortExpr {
465                expr: transformed.data,
466                options: sort_expr.options,
467            });
468        }
469
470        // Recursively push down to child node
471        match child.try_pushdown_sort(&child_order)? {
472            SortOrderPushdownResult::Exact { inner } => {
473                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
474                Ok(SortOrderPushdownResult::Exact { inner: new_exec })
475            }
476            SortOrderPushdownResult::Inexact { inner } => {
477                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
478                Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
479            }
480            SortOrderPushdownResult::Unsupported => {
481                Ok(SortOrderPushdownResult::Unsupported)
482            }
483        }
484    }
485
486    fn with_preserve_order(
487        &self,
488        preserve_order: bool,
489    ) -> Option<Arc<dyn ExecutionPlan>> {
490        self.input
491            .with_preserve_order(preserve_order)
492            .and_then(|new_input| {
493                Arc::new(self.clone())
494                    .with_new_children(vec![new_input])
495                    .ok()
496            })
497    }
498}
499
500impl ProjectionStream {
501    /// Create a new projection stream
502    fn new(
503        projector: Projector,
504        input: SendableRecordBatchStream,
505        baseline_metrics: BaselineMetrics,
506    ) -> Result<Self> {
507        Ok(Self {
508            projector,
509            input,
510            baseline_metrics,
511        })
512    }
513
514    fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
515        // Records time on drop
516        let _timer = self.baseline_metrics.elapsed_compute().timer();
517        self.projector.project_batch(batch)
518    }
519}
520
521/// Projection iterator
522struct ProjectionStream {
523    projector: Projector,
524    input: SendableRecordBatchStream,
525    baseline_metrics: BaselineMetrics,
526}
527
528impl Stream for ProjectionStream {
529    type Item = Result<RecordBatch>;
530
531    fn poll_next(
532        mut self: Pin<&mut Self>,
533        cx: &mut Context<'_>,
534    ) -> Poll<Option<Self::Item>> {
535        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
536            Some(Ok(batch)) => Some(self.batch_project(&batch)),
537            other => other,
538        });
539
540        self.baseline_metrics.record_poll(poll)
541    }
542
543    fn size_hint(&self) -> (usize, Option<usize>) {
544        // Same number of record batches
545        self.input.size_hint()
546    }
547}
548
549impl RecordBatchStream for ProjectionStream {
550    /// Get the schema
551    fn schema(&self) -> SchemaRef {
552        Arc::clone(self.projector.output_schema())
553    }
554}
555
556/// Trait for execution plans that can embed a projection, avoiding a separate
557/// [`ProjectionExec`] wrapper.
558///
559/// # Empty projections
560///
561/// `Some(vec![])` is a valid projection that produces zero output columns while
562/// preserving the correct row count. Implementors must ensure that runtime batch
563/// construction still returns batches with the right number of rows even when no
564/// columns are selected (e.g. for `SELECT count(1) … JOIN …`).
565pub trait EmbeddedProjection: ExecutionPlan + Sized {
566    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
567}
568
569/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later.
570/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation.
571pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
572    projection: &ProjectionExec,
573    execution_plan: &Exec,
574) -> Result<Option<Arc<dyn ExecutionPlan>>> {
575    // If the projection has no expressions at all (e.g., ProjectionExec: expr=[]),
576    // embed an empty projection into the execution plan so it outputs zero columns.
577    // This avoids allocating throwaway null arrays for build-side columns
578    // when no output columns are actually needed (e.g., count(1) over a right join).
579    if projection.expr().is_empty() {
580        let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
581        return Ok(Some(new_execution_plan));
582    }
583
584    // Collect all column indices from the given projection expressions.
585    let projection_index = collect_column_indices(projection.expr());
586
587    if projection_index.is_empty() {
588        return Ok(None);
589    };
590
591    let columns_reduced = projection_index.len() < execution_plan.schema().fields().len();
592
593    let new_execution_plan =
594        Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
595
596    // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields.
597    let embed_project_exprs = projection_index
598        .iter()
599        .zip(new_execution_plan.schema().fields())
600        .map(|(index, field)| ProjectionExpr {
601            expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
602            alias: field.name().to_owned(),
603        })
604        .collect::<Vec<_>>();
605
606    let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
607
608    for proj_expr in projection.expr() {
609        // update column index for projection expression since the input schema has been changed.
610        let Some(expr) =
611            update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
612        else {
613            return Ok(None);
614        };
615        new_projection_exprs.push(ProjectionExpr {
616            expr,
617            alias: proj_expr.alias.clone(),
618        });
619    }
620    // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection.
621    let new_projection = Arc::new(ProjectionExec::try_new(
622        new_projection_exprs,
623        Arc::clone(&new_execution_plan) as _,
624    )?);
625    if is_projection_removable(&new_projection) {
626        // Residual is identity — embedding fully absorbed the projection.
627        Ok(Some(new_execution_plan))
628    } else if columns_reduced {
629        // Embedding reduced columns even though a residual is still needed
630        // for renames or expressions — worth keeping.
631        Ok(Some(new_projection))
632    } else {
633        // No columns eliminated and residual still needed — embedding just
634        // adds an unnecessary column reorder inside the operator.
635        Ok(None)
636    }
637}
638
639pub struct JoinData {
640    pub projected_left_child: ProjectionExec,
641    pub projected_right_child: ProjectionExec,
642    pub join_filter: Option<JoinFilter>,
643    pub join_on: JoinOn,
644}
645
646pub fn try_pushdown_through_join(
647    projection: &ProjectionExec,
648    join_left: &Arc<dyn ExecutionPlan>,
649    join_right: &Arc<dyn ExecutionPlan>,
650    join_on: JoinOnRef,
651    schema: &SchemaRef,
652    filter: Option<&JoinFilter>,
653) -> Result<Option<JoinData>> {
654    // Convert projected expressions to columns. We can not proceed if this is not possible.
655    let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
656        return Ok(None);
657    };
658
659    let (far_right_left_col_ind, far_left_right_col_ind) =
660        join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
661
662    if !join_allows_pushdown(
663        &projection_as_columns,
664        schema,
665        far_right_left_col_ind,
666        far_left_right_col_ind,
667    ) {
668        return Ok(None);
669    }
670
671    let new_filter = if let Some(filter) = filter {
672        match update_join_filter(
673            &projection_as_columns[0..=far_right_left_col_ind as _],
674            &projection_as_columns[far_left_right_col_ind as _..],
675            filter,
676            join_left.schema().fields().len(),
677        ) {
678            Some(updated_filter) => Some(updated_filter),
679            None => return Ok(None),
680        }
681    } else {
682        None
683    };
684
685    let Some(new_on) = update_join_on(
686        &projection_as_columns[0..=far_right_left_col_ind as _],
687        &projection_as_columns[far_left_right_col_ind as _..],
688        join_on,
689        join_left.schema().fields().len(),
690    ) else {
691        return Ok(None);
692    };
693
694    let (new_left, new_right) = new_join_children(
695        &projection_as_columns,
696        far_right_left_col_ind,
697        far_left_right_col_ind,
698        join_left,
699        join_right,
700    )?;
701
702    Ok(Some(JoinData {
703        projected_left_child: new_left,
704        projected_right_child: new_right,
705        join_filter: new_filter,
706        join_on: new_on,
707    }))
708}
709
710/// This function checks if `plan` is a [`ProjectionExec`], and inspects its
711/// input(s) to test whether it can push `plan` under its input(s). This function
712/// will operate on the entire tree and may ultimately remove `plan` entirely
713/// by leveraging source providers with built-in projection capabilities.
714pub fn remove_unnecessary_projections(
715    plan: Arc<dyn ExecutionPlan>,
716) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
717    let maybe_modified = if let Some(projection) = plan.downcast_ref::<ProjectionExec>() {
718        // If the projection does not cause any change on the input, we can
719        // safely remove it:
720        if is_projection_removable(projection) {
721            return Ok(Transformed::yes(Arc::clone(projection.input())));
722        }
723        // If it does, check if we can push it under its child(ren):
724        projection
725            .input()
726            .try_swapping_with_projection(projection)?
727    } else {
728        return Ok(Transformed::no(plan));
729    };
730    Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
731}
732
733/// Compare the inputs and outputs of the projection. All expressions must be
734/// columns without alias, and projection does not change the order of fields.
735/// For example, if the input schema is `a, b`, `SELECT a, b` is removable,
736/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not.
737fn is_projection_removable(projection: &ProjectionExec) -> bool {
738    let exprs = projection.expr();
739    exprs.iter().enumerate().all(|(idx, proj_expr)| {
740        let Some(col) = proj_expr.expr.downcast_ref::<Column>() else {
741            return false;
742        };
743        col.name() == proj_expr.alias && col.index() == idx
744    }) && exprs.len() == projection.input().schema().fields().len()
745}
746
747/// Given the expression set of a projection, checks if the projection causes
748/// any renaming or constructs a non-`Column` physical expression.
749pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
750    exprs.iter().all(|proj_expr| {
751        proj_expr
752            .expr
753            .downcast_ref::<Column>()
754            .map(|column| column.name() == proj_expr.alias)
755            .unwrap_or(false)
756    })
757}
758
759/// Updates a source provider's projected columns according to the given
760/// projection operator's expressions. To use this function safely, one must
761/// ensure that all expressions are `Column` expressions without aliases.
762pub fn new_projections_for_columns(
763    projection: &[ProjectionExpr],
764    source: &[usize],
765) -> Vec<usize> {
766    projection
767        .iter()
768        .filter_map(|proj_expr| {
769            proj_expr
770                .expr
771                .downcast_ref::<Column>()
772                .map(|expr| source[expr.index()])
773        })
774        .collect()
775}
776
777/// Creates a new [`ProjectionExec`] instance with the given child plan and
778/// projected expressions.
779pub fn make_with_child(
780    projection: &ProjectionExec,
781    child: &Arc<dyn ExecutionPlan>,
782) -> Result<Arc<dyn ExecutionPlan>> {
783    ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
784        .map(|e| Arc::new(e) as _)
785}
786
787/// Returns `true` if all the expressions in the argument are `Column`s.
788pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
789    exprs.iter().all(|proj_expr| proj_expr.expr.is::<Column>())
790}
791
792/// Updates the given lexicographic ordering according to given projected
793/// expressions using the [`update_expr`] function.
794pub fn update_ordering(
795    ordering: LexOrdering,
796    projected_exprs: &[ProjectionExpr],
797) -> Result<Option<LexOrdering>> {
798    let mut updated_exprs = vec![];
799    for mut sort_expr in ordering.into_iter() {
800        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
801        else {
802            return Ok(None);
803        };
804        sort_expr.expr = updated_expr;
805        updated_exprs.push(sort_expr);
806    }
807    Ok(LexOrdering::new(updated_exprs))
808}
809
810/// Updates the given lexicographic requirement according to given projected
811/// expressions using the [`update_expr`] function.
812pub fn update_ordering_requirement(
813    reqs: LexRequirement,
814    projected_exprs: &[ProjectionExpr],
815) -> Result<Option<LexRequirement>> {
816    let mut updated_exprs = vec![];
817    for mut sort_expr in reqs.into_iter() {
818        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
819        else {
820            return Ok(None);
821        };
822        sort_expr.expr = updated_expr;
823        updated_exprs.push(sort_expr);
824    }
825    Ok(LexRequirement::new(updated_exprs))
826}
827
828/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given
829/// expressions is not a `Column`, returns `None`.
830pub fn physical_to_column_exprs(
831    exprs: &[ProjectionExpr],
832) -> Option<Vec<(Column, String)>> {
833    exprs
834        .iter()
835        .map(|proj_expr| {
836            proj_expr
837                .expr
838                .downcast_ref::<Column>()
839                .map(|col| (col.clone(), proj_expr.alias.clone()))
840        })
841        .collect()
842}
843
844/// If pushing down the projection over this join's children seems possible,
845/// this function constructs the new [`ProjectionExec`]s that will come on top
846/// of the original children of the join.
847pub fn new_join_children(
848    projection_as_columns: &[(Column, String)],
849    far_right_left_col_ind: i32,
850    far_left_right_col_ind: i32,
851    left_child: &Arc<dyn ExecutionPlan>,
852    right_child: &Arc<dyn ExecutionPlan>,
853) -> Result<(ProjectionExec, ProjectionExec)> {
854    let new_left = ProjectionExec::try_new(
855        projection_as_columns[0..=far_right_left_col_ind as _]
856            .iter()
857            .map(|(col, alias)| ProjectionExpr {
858                expr: Arc::new(Column::new(col.name(), col.index())) as _,
859                alias: alias.clone(),
860            }),
861        Arc::clone(left_child),
862    )?;
863    let left_size = left_child.schema().fields().len() as i32;
864    let new_right = ProjectionExec::try_new(
865        projection_as_columns[far_left_right_col_ind as _..]
866            .iter()
867            .map(|(col, alias)| {
868                ProjectionExpr {
869                    expr: Arc::new(Column::new(
870                        col.name(),
871                        // Align projected expressions coming from the right
872                        // table with the new right child projection:
873                        (col.index() as i32 - left_size) as _,
874                    )) as _,
875                    alias: alias.clone(),
876                }
877            }),
878        Arc::clone(right_child),
879    )?;
880
881    Ok((new_left, new_right))
882}
883
884/// Checks three conditions for pushing a projection down through a join:
885/// - Projection must narrow the join output schema.
886/// - Columns coming from left/right tables must be collected at the left/right
887///   sides of the output table.
888/// - Left or right table is not lost after the projection.
889pub fn join_allows_pushdown(
890    projection_as_columns: &[(Column, String)],
891    join_schema: &SchemaRef,
892    far_right_left_col_ind: i32,
893    far_left_right_col_ind: i32,
894) -> bool {
895    // Projection must narrow the join output:
896    projection_as_columns.len() < join_schema.fields().len()
897    // Are the columns from different tables mixed?
898    && (far_right_left_col_ind + 1 == far_left_right_col_ind)
899    // Left or right table is not lost after the projection.
900    && far_right_left_col_ind >= 0
901    && far_left_right_col_ind < projection_as_columns.len() as i32
902}
903
904/// Returns the last index before encountering a column coming from the right table when traveling
905/// through the projection from left to right, and the last index before encountering a column
906/// coming from the left table when traveling through the projection from right to left.
907/// If there is no column in the projection coming from the left side, it returns (-1, ...),
908/// if there is no column in the projection coming from the right side, it returns (..., projection length).
909pub fn join_table_borders(
910    left_table_column_count: usize,
911    projection_as_columns: &[(Column, String)],
912) -> (i32, i32) {
913    let far_right_left_col_ind = projection_as_columns
914        .iter()
915        .enumerate()
916        .take_while(|(_, (projection_column, _))| {
917            projection_column.index() < left_table_column_count
918        })
919        .last()
920        .map(|(index, _)| index as i32)
921        .unwrap_or(-1);
922
923    let far_left_right_col_ind = projection_as_columns
924        .iter()
925        .enumerate()
926        .rev()
927        .take_while(|(_, (projection_column, _))| {
928            projection_column.index() >= left_table_column_count
929        })
930        .last()
931        .map(|(index, _)| index as i32)
932        .unwrap_or(projection_as_columns.len() as i32);
933
934    (far_right_left_col_ind, far_left_right_col_ind)
935}
936
937/// Tries to update the equi-join `Column`'s of a join as if the input of
938/// the join was replaced by a projection.
939pub fn update_join_on(
940    proj_left_exprs: &[(Column, String)],
941    proj_right_exprs: &[(Column, String)],
942    hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
943    left_field_size: usize,
944) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
945    let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
946        .iter()
947        .map(|(left, right)| (left, right))
948        .unzip();
949
950    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
951    let new_right_columns =
952        new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
953
954    match (new_left_columns, new_right_columns) {
955        (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
956        _ => None,
957    }
958}
959
960/// Tries to update the column indices of a [`JoinFilter`] as if the input of
961/// the join was replaced by a projection.
962pub fn update_join_filter(
963    projection_left_exprs: &[(Column, String)],
964    projection_right_exprs: &[(Column, String)],
965    join_filter: &JoinFilter,
966    left_field_size: usize,
967) -> Option<JoinFilter> {
968    let mut new_left_indices = new_indices_for_join_filter(
969        join_filter,
970        JoinSide::Left,
971        projection_left_exprs,
972        0,
973    )
974    .into_iter();
975    let mut new_right_indices = new_indices_for_join_filter(
976        join_filter,
977        JoinSide::Right,
978        projection_right_exprs,
979        left_field_size,
980    )
981    .into_iter();
982
983    // Check if all columns match:
984    (new_right_indices.len() + new_left_indices.len()
985        == join_filter.column_indices().len())
986    .then(|| {
987        JoinFilter::new(
988            Arc::clone(join_filter.expression()),
989            join_filter
990                .column_indices()
991                .iter()
992                .map(|col_idx| ColumnIndex {
993                    index: if col_idx.side == JoinSide::Left {
994                        new_left_indices.next().unwrap()
995                    } else {
996                        new_right_indices.next().unwrap()
997                    },
998                    side: col_idx.side,
999                })
1000                .collect(),
1001            Arc::clone(join_filter.schema()),
1002        )
1003    })
1004}
1005
1006/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
1007fn try_unifying_projections(
1008    projection: &ProjectionExec,
1009    child: &ProjectionExec,
1010) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1011    let mut projected_exprs = vec![];
1012    let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1013
1014    // Collect the column references usage in the outer projection.
1015    projection.expr().iter().for_each(|proj_expr| {
1016        proj_expr
1017            .expr
1018            .apply(|expr| {
1019                Ok({
1020                    if let Some(column) = expr.downcast_ref::<Column>() {
1021                        *column_ref_map.entry(column.clone()).or_default() += 1;
1022                    }
1023                    TreeNodeRecursion::Continue
1024                })
1025            })
1026            .unwrap();
1027    });
1028    // Merging these projections is not beneficial, e.g
1029    // If an expression is not trivial (KeepInPlace) and it is referred more than 1, unifies projections will be
1030    // beneficial as caching mechanism for non-trivial computations.
1031    // See discussion in: https://github.com/apache/datafusion/issues/8296
1032    if column_ref_map.iter().any(|(column, count)| {
1033        *count > 1
1034            && !child.expr()[column.index()]
1035                .expr
1036                .placement()
1037                .should_push_to_leaves()
1038    }) {
1039        return Ok(None);
1040    }
1041    for proj_expr in projection.expr() {
1042        // If there is no match in the input projection, we cannot unify these
1043        // projections. This case will arise if the projection expression contains
1044        // a `PhysicalExpr` variant `update_expr` doesn't support.
1045        let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1046            return Ok(None);
1047        };
1048        projected_exprs.push(ProjectionExpr {
1049            expr,
1050            alias: proj_expr.alias.clone(),
1051        });
1052    }
1053    ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1054        .map(|e| Some(Arc::new(e) as _))
1055}
1056
1057/// Collect all column indices from the given projection expressions.
1058fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1059    // Collect column indices in a deterministic order that preserves the
1060    // projection's column ordering. For simple Column expressions, we use
1061    // the column index directly. For complex expressions, we walk the
1062    // expression tree to collect column references in traversal order.
1063    // This allows the embedded projection to match the desired output
1064    // column order, avoiding a residual ProjectionExec.
1065    let mut seen = std::collections::HashSet::new();
1066    let mut indices = Vec::new();
1067    for proj_expr in exprs {
1068        if let Some(col) = proj_expr.expr.downcast_ref::<Column>() {
1069            // Simple column reference: preserve projection order.
1070            if seen.insert(col.index()) {
1071                indices.push(col.index());
1072            }
1073        } else {
1074            // Complex expression: collect all referenced columns in
1075            // expression tree traversal order (deterministic) to preserve
1076            // the natural ordering of column references.
1077            proj_expr
1078                .expr
1079                .apply(|expr| {
1080                    if let Some(col) = expr.downcast_ref::<Column>()
1081                        && seen.insert(col.index())
1082                    {
1083                        indices.push(col.index());
1084                    }
1085                    Ok(TreeNodeRecursion::Continue)
1086                })
1087                .expect("closure always returns OK");
1088        }
1089    }
1090    indices
1091}
1092
1093/// This function determines and returns a vector of indices representing the
1094/// positions of columns in `projection_exprs` that are involved in `join_filter`,
1095/// and correspond to a particular side (`join_side`) of the join operation.
1096///
1097/// Notes: Column indices in the projection expressions are based on the join schema,
1098/// whereas the join filter is based on the join child schema. `column_index_offset`
1099/// represents the offset between them.
1100fn new_indices_for_join_filter(
1101    join_filter: &JoinFilter,
1102    join_side: JoinSide,
1103    projection_exprs: &[(Column, String)],
1104    column_index_offset: usize,
1105) -> Vec<usize> {
1106    join_filter
1107        .column_indices()
1108        .iter()
1109        .filter(|col_idx| col_idx.side == join_side)
1110        .filter_map(|col_idx| {
1111            projection_exprs
1112                .iter()
1113                .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1114        })
1115        .collect()
1116}
1117
1118/// This function generates a new set of columns to be used in a hash join
1119/// operation based on a set of equi-join conditions (`hash_join_on`) and a
1120/// list of projection expressions (`projection_exprs`).
1121///
1122/// Notes: Column indices in the projection expressions are based on the join schema,
1123/// whereas the join on expressions are based on the join child schema. `column_index_offset`
1124/// represents the offset between them.
1125fn new_columns_for_join_on(
1126    hash_join_on: &[&PhysicalExprRef],
1127    projection_exprs: &[(Column, String)],
1128    column_index_offset: usize,
1129) -> Option<Vec<PhysicalExprRef>> {
1130    let new_columns = hash_join_on
1131        .iter()
1132        .filter_map(|on| {
1133            // Rewrite all columns in `on`
1134            Arc::clone(*on)
1135                .transform(|expr| {
1136                    if let Some(column) = expr.downcast_ref::<Column>() {
1137                        // Find the column in the projection expressions
1138                        let new_column = projection_exprs
1139                            .iter()
1140                            .enumerate()
1141                            .find(|(_, (proj_column, _))| {
1142                                column.name() == proj_column.name()
1143                                    && column.index() + column_index_offset
1144                                        == proj_column.index()
1145                            })
1146                            .map(|(index, (_, alias))| Column::new(alias, index));
1147                        if let Some(new_column) = new_column {
1148                            Ok(Transformed::yes(Arc::new(new_column)))
1149                        } else {
1150                            // If the column is not found in the projection expressions,
1151                            // it means that the column is not projected. In this case,
1152                            // we cannot push the projection down.
1153                            internal_err!(
1154                                "Column {:?} not found in projection expressions",
1155                                column
1156                            )
1157                        }
1158                    } else {
1159                        Ok(Transformed::no(expr))
1160                    }
1161                })
1162                .data()
1163                .ok()
1164        })
1165        .collect::<Vec<_>>();
1166    (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use super::*;
1172
1173    use crate::common::collect;
1174
1175    use crate::filter_pushdown::PushedDown;
1176    use crate::test;
1177    use crate::test::exec::StatisticsExec;
1178
1179    use arrow::datatypes::{DataType, Field, Schema};
1180    use datafusion_common::ScalarValue;
1181    use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1182
1183    use datafusion_expr::Operator;
1184    use datafusion_physical_expr::expressions::{
1185        BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit,
1186    };
1187
1188    #[test]
1189    fn test_collect_column_indices() -> Result<()> {
1190        let expr = Arc::new(BinaryExpr::new(
1191            Arc::new(Column::new("b", 7)),
1192            Operator::Minus,
1193            Arc::new(BinaryExpr::new(
1194                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1195                Operator::Plus,
1196                Arc::new(Column::new("a", 1)),
1197            )),
1198        ));
1199        let column_indices = collect_column_indices(&[ProjectionExpr {
1200            expr,
1201            alias: "b-(1+a)".to_string(),
1202        }]);
1203        // Tree traversal order: b@7 is visited before a@1
1204        assert_eq!(column_indices, vec![7, 1]);
1205        Ok(())
1206    }
1207
1208    #[test]
1209    fn test_join_table_borders() -> Result<()> {
1210        let projections = vec![
1211            (Column::new("b", 1), "b".to_owned()),
1212            (Column::new("c", 2), "c".to_owned()),
1213            (Column::new("e", 4), "e".to_owned()),
1214            (Column::new("d", 3), "d".to_owned()),
1215            (Column::new("c", 2), "c".to_owned()),
1216            (Column::new("f", 5), "f".to_owned()),
1217            (Column::new("h", 7), "h".to_owned()),
1218            (Column::new("g", 6), "g".to_owned()),
1219        ];
1220        let left_table_column_count = 5;
1221        assert_eq!(
1222            join_table_borders(left_table_column_count, &projections),
1223            (4, 5)
1224        );
1225
1226        let left_table_column_count = 8;
1227        assert_eq!(
1228            join_table_borders(left_table_column_count, &projections),
1229            (7, 8)
1230        );
1231
1232        let left_table_column_count = 1;
1233        assert_eq!(
1234            join_table_borders(left_table_column_count, &projections),
1235            (-1, 0)
1236        );
1237
1238        let projections = vec![
1239            (Column::new("a", 0), "a".to_owned()),
1240            (Column::new("b", 1), "b".to_owned()),
1241            (Column::new("d", 3), "d".to_owned()),
1242            (Column::new("g", 6), "g".to_owned()),
1243            (Column::new("e", 4), "e".to_owned()),
1244            (Column::new("f", 5), "f".to_owned()),
1245            (Column::new("e", 4), "e".to_owned()),
1246            (Column::new("h", 7), "h".to_owned()),
1247        ];
1248        let left_table_column_count = 5;
1249        assert_eq!(
1250            join_table_borders(left_table_column_count, &projections),
1251            (2, 7)
1252        );
1253
1254        let left_table_column_count = 7;
1255        assert_eq!(
1256            join_table_borders(left_table_column_count, &projections),
1257            (6, 7)
1258        );
1259
1260        Ok(())
1261    }
1262
1263    #[tokio::test]
1264    async fn project_no_column() -> Result<()> {
1265        let task_ctx = Arc::new(TaskContext::default());
1266
1267        let exec = test::scan_partitioned(1);
1268        let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1269
1270        let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1271        let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1272        let output = collect(stream).await?;
1273        assert_eq!(output.len(), expected.len());
1274
1275        Ok(())
1276    }
1277
1278    #[tokio::test]
1279    async fn project_old_syntax() {
1280        let exec = test::scan_partitioned(1);
1281        let schema = exec.schema();
1282        let expr = col("i", &schema).unwrap();
1283        ProjectionExec::try_new(
1284            vec![
1285                // use From impl of ProjectionExpr to create ProjectionExpr
1286                // to test old syntax
1287                (expr, "c".to_string()),
1288            ],
1289            exec,
1290        )
1291        // expect this to succeed
1292        .unwrap();
1293    }
1294
1295    #[test]
1296    fn test_projection_statistics_uses_input_schema() {
1297        let input_schema = Schema::new(vec![
1298            Field::new("a", DataType::Int32, false),
1299            Field::new("b", DataType::Int32, false),
1300            Field::new("c", DataType::Int32, false),
1301            Field::new("d", DataType::Int32, false),
1302            Field::new("e", DataType::Int32, false),
1303            Field::new("f", DataType::Int32, false),
1304        ]);
1305
1306        let input_statistics = Statistics {
1307            num_rows: Precision::Exact(10),
1308            column_statistics: vec![
1309                ColumnStatistics {
1310                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1311                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1312                    ..Default::default()
1313                },
1314                ColumnStatistics {
1315                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1316                    max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1317                    ..Default::default()
1318                },
1319                ColumnStatistics {
1320                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1321                    max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1322                    ..Default::default()
1323                },
1324                ColumnStatistics {
1325                    min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1326                    max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1327                    ..Default::default()
1328                },
1329                ColumnStatistics {
1330                    min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1331                    max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1332                    ..Default::default()
1333                },
1334                ColumnStatistics {
1335                    min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1336                    max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1337                    ..Default::default()
1338                },
1339            ],
1340            ..Default::default()
1341        };
1342
1343        let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1344
1345        // Create projection expressions that reference columns from the input schema and the length
1346        // of output schema columns < input schema columns and hence if we use the last few columns
1347        // from the input schema in the expressions here, bounds_check would fail on them if output
1348        // schema is supplied to the partitions_statistics method.
1349        let exprs: Vec<ProjectionExpr> = vec![
1350            ProjectionExpr {
1351                expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1352                alias: "c_renamed".to_string(),
1353            },
1354            ProjectionExpr {
1355                expr: Arc::new(BinaryExpr::new(
1356                    Arc::new(Column::new("e", 4)),
1357                    Operator::Plus,
1358                    Arc::new(Column::new("f", 5)),
1359                )) as Arc<dyn PhysicalExpr>,
1360                alias: "e_plus_f".to_string(),
1361            },
1362        ];
1363
1364        let projection = ProjectionExec::try_new(exprs, input).unwrap();
1365
1366        let stats = projection.partition_statistics(None).unwrap();
1367
1368        assert_eq!(stats.num_rows, Precision::Exact(10));
1369        assert_eq!(
1370            stats.column_statistics.len(),
1371            2,
1372            "Expected 2 columns in projection statistics"
1373        );
1374        assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1375    }
1376
1377    #[test]
1378    fn test_filter_pushdown_with_alias() -> Result<()> {
1379        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1380        let input = Arc::new(StatisticsExec::new(
1381            Statistics::new_unknown(&input_schema),
1382            input_schema.clone(),
1383        ));
1384
1385        // project "a" as "b"
1386        let projection = ProjectionExec::try_new(
1387            vec![ProjectionExpr {
1388                expr: Arc::new(Column::new("a", 0)),
1389                alias: "b".to_string(),
1390            }],
1391            input,
1392        )?;
1393
1394        // filter "b > 5"
1395        let filter = Arc::new(BinaryExpr::new(
1396            Arc::new(Column::new("b", 0)),
1397            Operator::Gt,
1398            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1399        )) as Arc<dyn PhysicalExpr>;
1400
1401        let description = projection.gather_filters_for_pushdown(
1402            FilterPushdownPhase::Post,
1403            vec![filter],
1404            &ConfigOptions::default(),
1405        )?;
1406
1407        // Should be converted to "a > 5"
1408        // "a" is index 0 in input
1409        let expected_filter = Arc::new(BinaryExpr::new(
1410            Arc::new(Column::new("a", 0)),
1411            Operator::Gt,
1412            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1413        )) as Arc<dyn PhysicalExpr>;
1414
1415        assert_eq!(description.self_filters(), vec![vec![]]);
1416        let pushed_filters = &description.parent_filters()[0];
1417        assert_eq!(
1418            format!("{}", pushed_filters[0].predicate),
1419            format!("{}", expected_filter)
1420        );
1421        // Verify the predicate was actually pushed down
1422        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1423
1424        Ok(())
1425    }
1426
1427    #[test]
1428    fn test_filter_pushdown_with_multiple_aliases() -> Result<()> {
1429        let input_schema = Schema::new(vec![
1430            Field::new("a", DataType::Int32, false),
1431            Field::new("b", DataType::Int32, false),
1432        ]);
1433        let input = Arc::new(StatisticsExec::new(
1434            Statistics {
1435                column_statistics: vec![Default::default(); input_schema.fields().len()],
1436                ..Default::default()
1437            },
1438            input_schema.clone(),
1439        ));
1440
1441        // project "a" as "x", "b" as "y"
1442        let projection = ProjectionExec::try_new(
1443            vec![
1444                ProjectionExpr {
1445                    expr: Arc::new(Column::new("a", 0)),
1446                    alias: "x".to_string(),
1447                },
1448                ProjectionExpr {
1449                    expr: Arc::new(Column::new("b", 1)),
1450                    alias: "y".to_string(),
1451                },
1452            ],
1453            input,
1454        )?;
1455
1456        // filter "x > 5"
1457        let filter1 = Arc::new(BinaryExpr::new(
1458            Arc::new(Column::new("x", 0)),
1459            Operator::Gt,
1460            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1461        )) as Arc<dyn PhysicalExpr>;
1462
1463        // filter "y < 10"
1464        let filter2 = Arc::new(BinaryExpr::new(
1465            Arc::new(Column::new("y", 1)),
1466            Operator::Lt,
1467            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1468        )) as Arc<dyn PhysicalExpr>;
1469
1470        let description = projection.gather_filters_for_pushdown(
1471            FilterPushdownPhase::Post,
1472            vec![filter1, filter2],
1473            &ConfigOptions::default(),
1474        )?;
1475
1476        // Should be converted to "a > 5" and "b < 10"
1477        let expected_filter1 = Arc::new(BinaryExpr::new(
1478            Arc::new(Column::new("a", 0)),
1479            Operator::Gt,
1480            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1481        )) as Arc<dyn PhysicalExpr>;
1482
1483        let expected_filter2 = Arc::new(BinaryExpr::new(
1484            Arc::new(Column::new("b", 1)),
1485            Operator::Lt,
1486            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1487        )) as Arc<dyn PhysicalExpr>;
1488
1489        let pushed_filters = &description.parent_filters()[0];
1490        assert_eq!(pushed_filters.len(), 2);
1491        // Note: The order of filters is preserved
1492        assert_eq!(
1493            format!("{}", pushed_filters[0].predicate),
1494            format!("{}", expected_filter1)
1495        );
1496        assert_eq!(
1497            format!("{}", pushed_filters[1].predicate),
1498            format!("{}", expected_filter2)
1499        );
1500        // Verify the predicates were actually pushed down
1501        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1502        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1503
1504        Ok(())
1505    }
1506
1507    #[test]
1508    fn test_filter_pushdown_with_swapped_aliases() -> Result<()> {
1509        let input_schema = Schema::new(vec![
1510            Field::new("a", DataType::Int32, false),
1511            Field::new("b", DataType::Int32, false),
1512        ]);
1513        let input = Arc::new(StatisticsExec::new(
1514            Statistics {
1515                column_statistics: vec![Default::default(); input_schema.fields().len()],
1516                ..Default::default()
1517            },
1518            input_schema.clone(),
1519        ));
1520
1521        // project "a" as "b", "b" as "a"
1522        let projection = ProjectionExec::try_new(
1523            vec![
1524                ProjectionExpr {
1525                    expr: Arc::new(Column::new("a", 0)),
1526                    alias: "b".to_string(),
1527                },
1528                ProjectionExpr {
1529                    expr: Arc::new(Column::new("b", 1)),
1530                    alias: "a".to_string(),
1531                },
1532            ],
1533            input,
1534        )?;
1535
1536        // filter "b > 5" (output column 0, which is "a" in input)
1537        let filter1 = Arc::new(BinaryExpr::new(
1538            Arc::new(Column::new("b", 0)),
1539            Operator::Gt,
1540            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1541        )) as Arc<dyn PhysicalExpr>;
1542
1543        // filter "a < 10" (output column 1, which is "b" in input)
1544        let filter2 = Arc::new(BinaryExpr::new(
1545            Arc::new(Column::new("a", 1)),
1546            Operator::Lt,
1547            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1548        )) as Arc<dyn PhysicalExpr>;
1549
1550        let description = projection.gather_filters_for_pushdown(
1551            FilterPushdownPhase::Post,
1552            vec![filter1, filter2],
1553            &ConfigOptions::default(),
1554        )?;
1555
1556        let pushed_filters = &description.parent_filters()[0];
1557        assert_eq!(pushed_filters.len(), 2);
1558
1559        // "b" (output index 0) -> "a" (input index 0)
1560        let expected_filter1 = "a@0 > 5";
1561        // "a" (output index 1) -> "b" (input index 1)
1562        let expected_filter2 = "b@1 < 10";
1563
1564        assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1565        assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1566        // Verify the predicates were actually pushed down
1567        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1568        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1569
1570        Ok(())
1571    }
1572
1573    #[test]
1574    fn test_filter_pushdown_with_mixed_columns() -> Result<()> {
1575        let input_schema = Schema::new(vec![
1576            Field::new("a", DataType::Int32, false),
1577            Field::new("b", DataType::Int32, false),
1578        ]);
1579        let input = Arc::new(StatisticsExec::new(
1580            Statistics {
1581                column_statistics: vec![Default::default(); input_schema.fields().len()],
1582                ..Default::default()
1583            },
1584            input_schema.clone(),
1585        ));
1586
1587        // project "a" as "x", "b" as "b" (pass through)
1588        let projection = ProjectionExec::try_new(
1589            vec![
1590                ProjectionExpr {
1591                    expr: Arc::new(Column::new("a", 0)),
1592                    alias: "x".to_string(),
1593                },
1594                ProjectionExpr {
1595                    expr: Arc::new(Column::new("b", 1)),
1596                    alias: "b".to_string(),
1597                },
1598            ],
1599            input,
1600        )?;
1601
1602        // filter "x > 5"
1603        let filter1 = Arc::new(BinaryExpr::new(
1604            Arc::new(Column::new("x", 0)),
1605            Operator::Gt,
1606            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1607        )) as Arc<dyn PhysicalExpr>;
1608
1609        // filter "b < 10" (using output index 1 which corresponds to 'b')
1610        let filter2 = Arc::new(BinaryExpr::new(
1611            Arc::new(Column::new("b", 1)),
1612            Operator::Lt,
1613            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1614        )) as Arc<dyn PhysicalExpr>;
1615
1616        let description = projection.gather_filters_for_pushdown(
1617            FilterPushdownPhase::Post,
1618            vec![filter1, filter2],
1619            &ConfigOptions::default(),
1620        )?;
1621
1622        let pushed_filters = &description.parent_filters()[0];
1623        assert_eq!(pushed_filters.len(), 2);
1624        // "x" -> "a" (index 0)
1625        let expected_filter1 = "a@0 > 5";
1626        // "b" -> "b" (index 1)
1627        let expected_filter2 = "b@1 < 10";
1628
1629        assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1630        assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1631        // Verify the predicates were actually pushed down
1632        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1633        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1634
1635        Ok(())
1636    }
1637
1638    #[test]
1639    fn test_filter_pushdown_with_complex_expression() -> Result<()> {
1640        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1641        let input = Arc::new(StatisticsExec::new(
1642            Statistics {
1643                column_statistics: vec![Default::default(); input_schema.fields().len()],
1644                ..Default::default()
1645            },
1646            input_schema.clone(),
1647        ));
1648
1649        // project "a + 1" as "z"
1650        let projection = ProjectionExec::try_new(
1651            vec![ProjectionExpr {
1652                expr: Arc::new(BinaryExpr::new(
1653                    Arc::new(Column::new("a", 0)),
1654                    Operator::Plus,
1655                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1656                )),
1657                alias: "z".to_string(),
1658            }],
1659            input,
1660        )?;
1661
1662        // filter "z > 10"
1663        let filter = Arc::new(BinaryExpr::new(
1664            Arc::new(Column::new("z", 0)),
1665            Operator::Gt,
1666            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1667        )) as Arc<dyn PhysicalExpr>;
1668
1669        let description = projection.gather_filters_for_pushdown(
1670            FilterPushdownPhase::Post,
1671            vec![filter],
1672            &ConfigOptions::default(),
1673        )?;
1674
1675        // expand to `a + 1 > 10`
1676        let pushed_filters = &description.parent_filters()[0];
1677        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1678        assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10");
1679
1680        Ok(())
1681    }
1682
1683    #[test]
1684    fn test_filter_pushdown_with_unknown_column() -> Result<()> {
1685        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1686        let input = Arc::new(StatisticsExec::new(
1687            Statistics {
1688                column_statistics: vec![Default::default(); input_schema.fields().len()],
1689                ..Default::default()
1690            },
1691            input_schema.clone(),
1692        ));
1693
1694        // project "a" as "a"
1695        let projection = ProjectionExec::try_new(
1696            vec![ProjectionExpr {
1697                expr: Arc::new(Column::new("a", 0)),
1698                alias: "a".to_string(),
1699            }],
1700            input,
1701        )?;
1702
1703        // filter "unknown_col > 5" - using a column name that doesn't exist in projection output
1704        // Column constructor: name, index. Index 1 doesn't exist.
1705        let filter = Arc::new(BinaryExpr::new(
1706            Arc::new(Column::new("unknown_col", 1)),
1707            Operator::Gt,
1708            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1709        )) as Arc<dyn PhysicalExpr>;
1710
1711        let description = projection.gather_filters_for_pushdown(
1712            FilterPushdownPhase::Post,
1713            vec![filter],
1714            &ConfigOptions::default(),
1715        )?;
1716
1717        let pushed_filters = &description.parent_filters()[0];
1718        assert!(matches!(pushed_filters[0].discriminant, PushedDown::No));
1719        // The column shouldn't be found in the alias map, so it remains unchanged with its index
1720        assert_eq!(
1721            format!("{}", pushed_filters[0].predicate),
1722            "unknown_col@1 > 5"
1723        );
1724
1725        Ok(())
1726    }
1727
1728    /// Basic test for `DynamicFilterPhysicalExpr` can correctly update its child expression
1729    /// i.e. starting with lit(true) and after update it becomes `a > 5`
1730    /// with projection [b - 1 as a], the pushed down filter should be `b - 1 > 5`
1731    #[test]
1732    fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> {
1733        let input_schema =
1734            Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
1735
1736        let input = Arc::new(StatisticsExec::new(
1737            Statistics {
1738                column_statistics: vec![Default::default(); input_schema.fields().len()],
1739                ..Default::default()
1740            },
1741            input_schema.as_ref().clone(),
1742        ));
1743
1744        // project "b" - 1 as "a"
1745        let projection = ProjectionExec::try_new(
1746            vec![ProjectionExpr {
1747                expr: binary(
1748                    Arc::new(Column::new("b", 0)),
1749                    Operator::Minus,
1750                    lit(1),
1751                    &input_schema,
1752                )
1753                .unwrap(),
1754                alias: "a".to_string(),
1755            }],
1756            input,
1757        )?;
1758
1759        // simulate projection's parent create a dynamic filter on "a"
1760        let projected_schema = projection.schema();
1761        let col_a = col("a", &projected_schema)?;
1762        let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1763            vec![Arc::clone(&col_a)],
1764            lit(true),
1765        ));
1766        // Initial state should be lit(true)
1767        let current = dynamic_filter.current()?;
1768        assert_eq!(format!("{current}"), "true");
1769
1770        let dyn_phy_expr: Arc<dyn PhysicalExpr> = Arc::clone(&dynamic_filter) as _;
1771
1772        let description = projection.gather_filters_for_pushdown(
1773            FilterPushdownPhase::Post,
1774            vec![dyn_phy_expr],
1775            &ConfigOptions::default(),
1776        )?;
1777
1778        let pushed_filters = &description.parent_filters()[0][0];
1779
1780        // Check currently pushed_filters is lit(true)
1781        assert_eq!(
1782            format!("{}", pushed_filters.predicate),
1783            "DynamicFilter [ empty ]"
1784        );
1785
1786        // Update to a > 5 (after projection, b is now called a)
1787        let new_expr =
1788            Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32)));
1789        dynamic_filter.update(new_expr)?;
1790
1791        // Now it should be a > 5
1792        let current = dynamic_filter.current()?;
1793        assert_eq!(format!("{current}"), "a@0 > 5");
1794
1795        // Check currently pushed_filters is b - 1 > 5 (because b - 1 is projected as a)
1796        assert_eq!(
1797            format!("{}", pushed_filters.predicate),
1798            "DynamicFilter [ b@0 - 1 > 5 ]"
1799        );
1800
1801        Ok(())
1802    }
1803}