Skip to main content

datafusion_physical_plan/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the projection execution plan. A projection determines which columns or expressions
19//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
20//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
21//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
22
23use super::expressions::Column;
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27    SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::column_rewriter::PhysicalColumnRewriter;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32    ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33    FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
34};
35use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
36use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
37use std::any::Any;
38use std::collections::HashMap;
39use std::pin::Pin;
40use std::sync::Arc;
41use std::task::{Context, Poll};
42
43use arrow::datatypes::SchemaRef;
44use arrow::record_batch::RecordBatch;
45use datafusion_common::config::ConfigOptions;
46use datafusion_common::tree_node::{
47    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
48};
49use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
50use datafusion_execution::TaskContext;
51use datafusion_expr::ExpressionPlacement;
52use datafusion_physical_expr::equivalence::ProjectionMapping;
53use datafusion_physical_expr::projection::Projector;
54use datafusion_physical_expr::utils::collect_columns;
55use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
56use datafusion_physical_expr_common::sort_expr::{
57    LexOrdering, LexRequirement, PhysicalSortExpr,
58};
59// Re-exported from datafusion-physical-expr for backwards compatibility
60// We recommend updating your imports to use datafusion-physical-expr directly
61pub use datafusion_physical_expr::projection::{
62    ProjectionExpr, ProjectionExprs, update_expr,
63};
64
65use futures::stream::{Stream, StreamExt};
66use log::trace;
67
68/// [`ExecutionPlan`] for a projection
69///
70/// Computes a set of scalar value expressions for each input row, producing one
71/// output row for each input row.
72#[derive(Debug, Clone)]
73pub struct ProjectionExec {
74    /// A projector specialized to apply the projection to the input schema from the child node
75    /// and produce [`RecordBatch`]es with the output schema of this node.
76    projector: Projector,
77    /// The input plan
78    input: Arc<dyn ExecutionPlan>,
79    /// Execution metrics
80    metrics: ExecutionPlanMetricsSet,
81    /// Cache holding plan properties like equivalences, output partitioning etc.
82    cache: Arc<PlanProperties>,
83}
84
85impl ProjectionExec {
86    /// Create a projection on an input
87    ///
88    /// # Example:
89    /// Create a `ProjectionExec` to crate `SELECT a, a+b AS sum_ab FROM t1`:
90    ///
91    /// ```
92    /// # use std::sync::Arc;
93    /// # use arrow_schema::{Schema, Field, DataType};
94    /// # use datafusion_expr::Operator;
95    /// # use datafusion_physical_plan::ExecutionPlan;
96    /// # use datafusion_physical_expr::expressions::{col, binary};
97    /// # use datafusion_physical_plan::empty::EmptyExec;
98    /// # use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
99    /// # fn schema() -> Arc<Schema> {
100    /// #  Arc::new(Schema::new(vec![
101    /// #   Field::new("a", DataType::Int32, false),
102    /// #   Field::new("b", DataType::Int32, false),
103    /// # ]))
104    /// # }
105    /// #
106    /// # fn input() -> Arc<dyn ExecutionPlan> {
107    /// #  Arc::new(EmptyExec::new(schema()))
108    /// # }
109    /// #
110    /// # fn main() {
111    /// let schema = schema();
112    /// // Create PhysicalExprs
113    /// let a = col("a", &schema).unwrap();
114    /// let b = col("b", &schema).unwrap();
115    /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, &schema).unwrap();
116    /// // create ProjectionExec
117    /// let proj = ProjectionExec::try_new(
118    ///     [
119    ///         ProjectionExpr {
120    ///             // expr a produces the column named "a"
121    ///             expr: a,
122    ///             alias: "a".to_string(),
123    ///         },
124    ///         ProjectionExpr {
125    ///             // expr: a + b produces the column named "sum_ab"
126    ///             expr: a_plus_b,
127    ///             alias: "sum_ab".to_string(),
128    ///         },
129    ///     ],
130    ///     input(),
131    /// )
132    /// .unwrap();
133    /// # }
134    /// ```
135    pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
136    where
137        I: IntoIterator<Item = E>,
138        E: Into<ProjectionExpr>,
139    {
140        let input_schema = input.schema();
141        let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>();
142        let projection = ProjectionExprs::from_expressions(expr_arc);
143        let projector = projection.make_projector(&input_schema)?;
144        Self::try_from_projector(projector, input)
145    }
146
147    fn try_from_projector(
148        projector: Projector,
149        input: Arc<dyn ExecutionPlan>,
150    ) -> Result<Self> {
151        // Construct a map from the input expressions to the output expression of the Projection
152        let projection_mapping =
153            projector.projection().projection_mapping(&input.schema())?;
154        let cache = Self::compute_properties(
155            &input,
156            &projection_mapping,
157            Arc::clone(projector.output_schema()),
158        )?;
159        Ok(Self {
160            projector,
161            input,
162            metrics: ExecutionPlanMetricsSet::new(),
163            cache: Arc::new(cache),
164        })
165    }
166
167    /// The projection expressions stored as tuples of (expression, output column name)
168    pub fn expr(&self) -> &[ProjectionExpr] {
169        self.projector.projection().as_ref()
170    }
171
172    /// The projection expressions as a [`ProjectionExprs`].
173    pub fn projection_expr(&self) -> &ProjectionExprs {
174        self.projector.projection()
175    }
176
177    /// The input plan
178    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
179        &self.input
180    }
181
182    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
183    fn compute_properties(
184        input: &Arc<dyn ExecutionPlan>,
185        projection_mapping: &ProjectionMapping,
186        schema: SchemaRef,
187    ) -> Result<PlanProperties> {
188        // Calculate equivalence properties:
189        let input_eq_properties = input.equivalence_properties();
190        let eq_properties = input_eq_properties.project(projection_mapping, schema);
191        // Calculate output partitioning, which needs to respect aliases:
192        let output_partitioning = input
193            .output_partitioning()
194            .project(projection_mapping, input_eq_properties);
195
196        Ok(PlanProperties::new(
197            eq_properties,
198            output_partitioning,
199            input.pipeline_behavior(),
200            input.boundedness(),
201        ))
202    }
203
204    /// Collect reverse alias mapping from projection expressions.
205    /// The result hash map is a map from aliased Column in parent to original expr.
206    fn collect_reverse_alias(
207        &self,
208    ) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
209        let mut alias_map = datafusion_common::HashMap::new();
210        for projection in self.projection_expr().iter() {
211            let (aliased_index, _output_field) = self
212                .projector
213                .output_schema()
214                .column_with_name(&projection.alias)
215                .ok_or_else(|| {
216                    DataFusionError::Internal(format!(
217                        "Expr {} with alias {} not found in output schema",
218                        projection.expr, projection.alias
219                    ))
220                })?;
221            let aliased_col = Column::new(&projection.alias, aliased_index);
222            alias_map.insert(aliased_col, Arc::clone(&projection.expr));
223        }
224        Ok(alias_map)
225    }
226
227    fn with_new_children_and_same_properties(
228        &self,
229        mut children: Vec<Arc<dyn ExecutionPlan>>,
230    ) -> Self {
231        Self {
232            input: children.swap_remove(0),
233            metrics: ExecutionPlanMetricsSet::new(),
234            ..Self::clone(self)
235        }
236    }
237}
238
239impl DisplayAs for ProjectionExec {
240    fn fmt_as(
241        &self,
242        t: DisplayFormatType,
243        f: &mut std::fmt::Formatter,
244    ) -> std::fmt::Result {
245        match t {
246            DisplayFormatType::Default | DisplayFormatType::Verbose => {
247                let expr: Vec<String> = self
248                    .projector
249                    .projection()
250                    .as_ref()
251                    .iter()
252                    .map(|proj_expr| {
253                        let e = proj_expr.expr.to_string();
254                        if e != proj_expr.alias {
255                            format!("{e} as {}", proj_expr.alias)
256                        } else {
257                            e
258                        }
259                    })
260                    .collect();
261
262                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
263            }
264            DisplayFormatType::TreeRender => {
265                for (i, proj_expr) in self.expr().iter().enumerate() {
266                    let expr_sql = fmt_sql(proj_expr.expr.as_ref());
267                    if proj_expr.expr.to_string() == proj_expr.alias {
268                        writeln!(f, "expr{i}={expr_sql}")?;
269                    } else {
270                        writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
271                    }
272                }
273
274                Ok(())
275            }
276        }
277    }
278}
279
280impl ExecutionPlan for ProjectionExec {
281    fn name(&self) -> &'static str {
282        "ProjectionExec"
283    }
284
285    /// Return a reference to Any that can be used for downcasting
286    fn as_any(&self) -> &dyn Any {
287        self
288    }
289
290    fn properties(&self) -> &Arc<PlanProperties> {
291        &self.cache
292    }
293
294    fn maintains_input_order(&self) -> Vec<bool> {
295        // Tell optimizer this operator doesn't reorder its input
296        vec![true]
297    }
298
299    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
300        let all_simple_exprs =
301            self.projector
302                .projection()
303                .as_ref()
304                .iter()
305                .all(|proj_expr| {
306                    !matches!(
307                        proj_expr.expr.placement(),
308                        ExpressionPlacement::KeepInPlace
309                    )
310                });
311        // If expressions are all either column_expr or Literal (or other cheap expressions),
312        // then all computations in this projection are reorder or rename,
313        // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
314        vec![!all_simple_exprs]
315    }
316
317    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
318        vec![&self.input]
319    }
320
321    fn with_new_children(
322        self: Arc<Self>,
323        mut children: Vec<Arc<dyn ExecutionPlan>>,
324    ) -> Result<Arc<dyn ExecutionPlan>> {
325        check_if_same_properties!(self, children);
326        ProjectionExec::try_from_projector(
327            self.projector.clone(),
328            children.swap_remove(0),
329        )
330        .map(|p| Arc::new(p) as _)
331    }
332
333    fn execute(
334        &self,
335        partition: usize,
336        context: Arc<TaskContext>,
337    ) -> Result<SendableRecordBatchStream> {
338        trace!(
339            "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
340            partition,
341            context.session_id(),
342            context.task_id()
343        );
344
345        let projector = self.projector.with_metrics(&self.metrics, partition);
346        Ok(Box::pin(ProjectionStream::new(
347            projector,
348            self.input.execute(partition, context)?,
349            BaselineMetrics::new(&self.metrics, partition),
350        )?))
351    }
352
353    fn metrics(&self) -> Option<MetricsSet> {
354        Some(self.metrics.clone_inner())
355    }
356
357    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
358        let input_stats = self.input.partition_statistics(partition)?;
359        let output_schema = self.schema();
360        self.projector
361            .projection()
362            .project_statistics(input_stats, &output_schema)
363    }
364
365    fn supports_limit_pushdown(&self) -> bool {
366        true
367    }
368
369    fn cardinality_effect(&self) -> CardinalityEffect {
370        CardinalityEffect::Equal
371    }
372
373    fn try_swapping_with_projection(
374        &self,
375        projection: &ProjectionExec,
376    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
377        let maybe_unified = try_unifying_projections(projection, self)?;
378        if let Some(new_plan) = maybe_unified {
379            // To unify 3 or more sequential projections:
380            remove_unnecessary_projections(new_plan).data().map(Some)
381        } else {
382            Ok(Some(Arc::new(projection.clone())))
383        }
384    }
385
386    fn gather_filters_for_pushdown(
387        &self,
388        _phase: FilterPushdownPhase,
389        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
390        _config: &ConfigOptions,
391    ) -> Result<FilterDescription> {
392        // expand alias column to original expr in parent filters
393        let invert_alias_map = self.collect_reverse_alias()?;
394        let output_schema = self.schema();
395        let remapper = FilterRemapper::new(output_schema);
396        let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
397
398        for filter in parent_filters {
399            // Check that column exists in child, then reassign column indices to match child schema
400            if let Some(reassigned) = remapper.try_remap(&filter)? {
401                // rewrite filter expression using invert alias map
402                let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
403                let rewritten = reassigned.rewrite(&mut rewriter)?.data;
404                child_parent_filters.push(PushedDownPredicate::supported(rewritten));
405            } else {
406                child_parent_filters.push(PushedDownPredicate::unsupported(filter));
407            }
408        }
409
410        Ok(FilterDescription::new().with_child(ChildFilterDescription {
411            parent_filters: child_parent_filters,
412            self_filters: vec![],
413        }))
414    }
415
416    fn handle_child_pushdown_result(
417        &self,
418        _phase: FilterPushdownPhase,
419        child_pushdown_result: ChildPushdownResult,
420        _config: &ConfigOptions,
421    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
422        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
423    }
424
425    fn try_pushdown_sort(
426        &self,
427        order: &[PhysicalSortExpr],
428    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
429        let child = self.input();
430        let mut child_order = Vec::new();
431
432        // Check and transform sort expressions
433        for sort_expr in order {
434            // Recursively transform the expression
435            let mut can_pushdown = true;
436            let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
437                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
438                    // Check if column index is valid.
439                    // This should always be true but fail gracefully if it's not.
440                    if col.index() >= self.expr().len() {
441                        can_pushdown = false;
442                        return Ok(Transformed::no(expr));
443                    }
444
445                    let proj_expr = &self.expr()[col.index()];
446
447                    // Check if projection expression is a simple column
448                    // We cannot push down order by clauses that depend on
449                    // projected computations as they would have nothing to reference.
450                    if let Some(child_col) =
451                        proj_expr.expr.as_any().downcast_ref::<Column>()
452                    {
453                        // Replace with the child column
454                        Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
455                    } else {
456                        // Projection involves computation, cannot push down
457                        can_pushdown = false;
458                        Ok(Transformed::no(expr))
459                    }
460                } else {
461                    Ok(Transformed::no(expr))
462                }
463            })?;
464
465            if !can_pushdown {
466                return Ok(SortOrderPushdownResult::Unsupported);
467            }
468
469            child_order.push(PhysicalSortExpr {
470                expr: transformed.data,
471                options: sort_expr.options,
472            });
473        }
474
475        // Recursively push down to child node
476        match child.try_pushdown_sort(&child_order)? {
477            SortOrderPushdownResult::Exact { inner } => {
478                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
479                Ok(SortOrderPushdownResult::Exact { inner: new_exec })
480            }
481            SortOrderPushdownResult::Inexact { inner } => {
482                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
483                Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
484            }
485            SortOrderPushdownResult::Unsupported => {
486                Ok(SortOrderPushdownResult::Unsupported)
487            }
488        }
489    }
490
491    fn with_preserve_order(
492        &self,
493        preserve_order: bool,
494    ) -> Option<Arc<dyn ExecutionPlan>> {
495        self.input
496            .with_preserve_order(preserve_order)
497            .and_then(|new_input| {
498                Arc::new(self.clone())
499                    .with_new_children(vec![new_input])
500                    .ok()
501            })
502    }
503}
504
505impl ProjectionStream {
506    /// Create a new projection stream
507    fn new(
508        projector: Projector,
509        input: SendableRecordBatchStream,
510        baseline_metrics: BaselineMetrics,
511    ) -> Result<Self> {
512        Ok(Self {
513            projector,
514            input,
515            baseline_metrics,
516        })
517    }
518
519    fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
520        // Records time on drop
521        let _timer = self.baseline_metrics.elapsed_compute().timer();
522        self.projector.project_batch(batch)
523    }
524}
525
526/// Projection iterator
527struct ProjectionStream {
528    projector: Projector,
529    input: SendableRecordBatchStream,
530    baseline_metrics: BaselineMetrics,
531}
532
533impl Stream for ProjectionStream {
534    type Item = Result<RecordBatch>;
535
536    fn poll_next(
537        mut self: Pin<&mut Self>,
538        cx: &mut Context<'_>,
539    ) -> Poll<Option<Self::Item>> {
540        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
541            Some(Ok(batch)) => Some(self.batch_project(&batch)),
542            other => other,
543        });
544
545        self.baseline_metrics.record_poll(poll)
546    }
547
548    fn size_hint(&self) -> (usize, Option<usize>) {
549        // Same number of record batches
550        self.input.size_hint()
551    }
552}
553
554impl RecordBatchStream for ProjectionStream {
555    /// Get the schema
556    fn schema(&self) -> SchemaRef {
557        Arc::clone(self.projector.output_schema())
558    }
559}
560
561/// Trait for execution plans that can embed a projection, avoiding a separate
562/// [`ProjectionExec`] wrapper.
563///
564/// # Empty projections
565///
566/// `Some(vec![])` is a valid projection that produces zero output columns while
567/// preserving the correct row count. Implementors must ensure that runtime batch
568/// construction still returns batches with the right number of rows even when no
569/// columns are selected (e.g. for `SELECT count(1) … JOIN …`).
570pub trait EmbeddedProjection: ExecutionPlan + Sized {
571    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
572}
573
574/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later.
575/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation.
576pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
577    projection: &ProjectionExec,
578    execution_plan: &Exec,
579) -> Result<Option<Arc<dyn ExecutionPlan>>> {
580    // If the projection has no expressions at all (e.g., ProjectionExec: expr=[]),
581    // embed an empty projection into the execution plan so it outputs zero columns.
582    // This avoids allocating throwaway null arrays for build-side columns
583    // when no output columns are actually needed (e.g., count(1) over a right join).
584    if projection.expr().is_empty() {
585        let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
586        return Ok(Some(new_execution_plan));
587    }
588
589    // Collect all column indices from the given projection expressions.
590    let projection_index = collect_column_indices(projection.expr());
591
592    if projection_index.is_empty() {
593        return Ok(None);
594    };
595
596    // If the projection indices is the same as the input columns, we don't need to embed the projection to hash join.
597    // Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields.
598    if projection_index.len() == projection_index.last().unwrap() + 1
599        && projection_index.len() == execution_plan.schema().fields().len()
600    {
601        return Ok(None);
602    }
603
604    let new_execution_plan =
605        Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
606
607    // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields.
608    let embed_project_exprs = projection_index
609        .iter()
610        .zip(new_execution_plan.schema().fields())
611        .map(|(index, field)| ProjectionExpr {
612            expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
613            alias: field.name().to_owned(),
614        })
615        .collect::<Vec<_>>();
616
617    let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
618
619    for proj_expr in projection.expr() {
620        // update column index for projection expression since the input schema has been changed.
621        let Some(expr) =
622            update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
623        else {
624            return Ok(None);
625        };
626        new_projection_exprs.push(ProjectionExpr {
627            expr,
628            alias: proj_expr.alias.clone(),
629        });
630    }
631    // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection.
632    let new_projection = Arc::new(ProjectionExec::try_new(
633        new_projection_exprs,
634        Arc::clone(&new_execution_plan) as _,
635    )?);
636    if is_projection_removable(&new_projection) {
637        Ok(Some(new_execution_plan))
638    } else {
639        Ok(Some(new_projection))
640    }
641}
642
643pub struct JoinData {
644    pub projected_left_child: ProjectionExec,
645    pub projected_right_child: ProjectionExec,
646    pub join_filter: Option<JoinFilter>,
647    pub join_on: JoinOn,
648}
649
650pub fn try_pushdown_through_join(
651    projection: &ProjectionExec,
652    join_left: &Arc<dyn ExecutionPlan>,
653    join_right: &Arc<dyn ExecutionPlan>,
654    join_on: JoinOnRef,
655    schema: &SchemaRef,
656    filter: Option<&JoinFilter>,
657) -> Result<Option<JoinData>> {
658    // Convert projected expressions to columns. We can not proceed if this is not possible.
659    let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
660        return Ok(None);
661    };
662
663    let (far_right_left_col_ind, far_left_right_col_ind) =
664        join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
665
666    if !join_allows_pushdown(
667        &projection_as_columns,
668        schema,
669        far_right_left_col_ind,
670        far_left_right_col_ind,
671    ) {
672        return Ok(None);
673    }
674
675    let new_filter = if let Some(filter) = filter {
676        match update_join_filter(
677            &projection_as_columns[0..=far_right_left_col_ind as _],
678            &projection_as_columns[far_left_right_col_ind as _..],
679            filter,
680            join_left.schema().fields().len(),
681        ) {
682            Some(updated_filter) => Some(updated_filter),
683            None => return Ok(None),
684        }
685    } else {
686        None
687    };
688
689    let Some(new_on) = update_join_on(
690        &projection_as_columns[0..=far_right_left_col_ind as _],
691        &projection_as_columns[far_left_right_col_ind as _..],
692        join_on,
693        join_left.schema().fields().len(),
694    ) else {
695        return Ok(None);
696    };
697
698    let (new_left, new_right) = new_join_children(
699        &projection_as_columns,
700        far_right_left_col_ind,
701        far_left_right_col_ind,
702        join_left,
703        join_right,
704    )?;
705
706    Ok(Some(JoinData {
707        projected_left_child: new_left,
708        projected_right_child: new_right,
709        join_filter: new_filter,
710        join_on: new_on,
711    }))
712}
713
714/// This function checks if `plan` is a [`ProjectionExec`], and inspects its
715/// input(s) to test whether it can push `plan` under its input(s). This function
716/// will operate on the entire tree and may ultimately remove `plan` entirely
717/// by leveraging source providers with built-in projection capabilities.
718pub fn remove_unnecessary_projections(
719    plan: Arc<dyn ExecutionPlan>,
720) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
721    let maybe_modified =
722        if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
723            // If the projection does not cause any change on the input, we can
724            // safely remove it:
725            if is_projection_removable(projection) {
726                return Ok(Transformed::yes(Arc::clone(projection.input())));
727            }
728            // If it does, check if we can push it under its child(ren):
729            projection
730                .input()
731                .try_swapping_with_projection(projection)?
732        } else {
733            return Ok(Transformed::no(plan));
734        };
735    Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
736}
737
738/// Compare the inputs and outputs of the projection. All expressions must be
739/// columns without alias, and projection does not change the order of fields.
740/// For example, if the input schema is `a, b`, `SELECT a, b` is removable,
741/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not.
742fn is_projection_removable(projection: &ProjectionExec) -> bool {
743    let exprs = projection.expr();
744    exprs.iter().enumerate().all(|(idx, proj_expr)| {
745        let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
746            return false;
747        };
748        col.name() == proj_expr.alias && col.index() == idx
749    }) && exprs.len() == projection.input().schema().fields().len()
750}
751
752/// Given the expression set of a projection, checks if the projection causes
753/// any renaming or constructs a non-`Column` physical expression.
754pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
755    exprs.iter().all(|proj_expr| {
756        proj_expr
757            .expr
758            .as_any()
759            .downcast_ref::<Column>()
760            .map(|column| column.name() == proj_expr.alias)
761            .unwrap_or(false)
762    })
763}
764
765/// Updates a source provider's projected columns according to the given
766/// projection operator's expressions. To use this function safely, one must
767/// ensure that all expressions are `Column` expressions without aliases.
768pub fn new_projections_for_columns(
769    projection: &[ProjectionExpr],
770    source: &[usize],
771) -> Vec<usize> {
772    projection
773        .iter()
774        .filter_map(|proj_expr| {
775            proj_expr
776                .expr
777                .as_any()
778                .downcast_ref::<Column>()
779                .map(|expr| source[expr.index()])
780        })
781        .collect()
782}
783
784/// Creates a new [`ProjectionExec`] instance with the given child plan and
785/// projected expressions.
786pub fn make_with_child(
787    projection: &ProjectionExec,
788    child: &Arc<dyn ExecutionPlan>,
789) -> Result<Arc<dyn ExecutionPlan>> {
790    ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
791        .map(|e| Arc::new(e) as _)
792}
793
794/// Returns `true` if all the expressions in the argument are `Column`s.
795pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
796    exprs
797        .iter()
798        .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
799}
800
801/// Updates the given lexicographic ordering according to given projected
802/// expressions using the [`update_expr`] function.
803pub fn update_ordering(
804    ordering: LexOrdering,
805    projected_exprs: &[ProjectionExpr],
806) -> Result<Option<LexOrdering>> {
807    let mut updated_exprs = vec![];
808    for mut sort_expr in ordering.into_iter() {
809        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
810        else {
811            return Ok(None);
812        };
813        sort_expr.expr = updated_expr;
814        updated_exprs.push(sort_expr);
815    }
816    Ok(LexOrdering::new(updated_exprs))
817}
818
819/// Updates the given lexicographic requirement according to given projected
820/// expressions using the [`update_expr`] function.
821pub fn update_ordering_requirement(
822    reqs: LexRequirement,
823    projected_exprs: &[ProjectionExpr],
824) -> Result<Option<LexRequirement>> {
825    let mut updated_exprs = vec![];
826    for mut sort_expr in reqs.into_iter() {
827        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
828        else {
829            return Ok(None);
830        };
831        sort_expr.expr = updated_expr;
832        updated_exprs.push(sort_expr);
833    }
834    Ok(LexRequirement::new(updated_exprs))
835}
836
837/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given
838/// expressions is not a `Column`, returns `None`.
839pub fn physical_to_column_exprs(
840    exprs: &[ProjectionExpr],
841) -> Option<Vec<(Column, String)>> {
842    exprs
843        .iter()
844        .map(|proj_expr| {
845            proj_expr
846                .expr
847                .as_any()
848                .downcast_ref::<Column>()
849                .map(|col| (col.clone(), proj_expr.alias.clone()))
850        })
851        .collect()
852}
853
854/// If pushing down the projection over this join's children seems possible,
855/// this function constructs the new [`ProjectionExec`]s that will come on top
856/// of the original children of the join.
857pub fn new_join_children(
858    projection_as_columns: &[(Column, String)],
859    far_right_left_col_ind: i32,
860    far_left_right_col_ind: i32,
861    left_child: &Arc<dyn ExecutionPlan>,
862    right_child: &Arc<dyn ExecutionPlan>,
863) -> Result<(ProjectionExec, ProjectionExec)> {
864    let new_left = ProjectionExec::try_new(
865        projection_as_columns[0..=far_right_left_col_ind as _]
866            .iter()
867            .map(|(col, alias)| ProjectionExpr {
868                expr: Arc::new(Column::new(col.name(), col.index())) as _,
869                alias: alias.clone(),
870            }),
871        Arc::clone(left_child),
872    )?;
873    let left_size = left_child.schema().fields().len() as i32;
874    let new_right = ProjectionExec::try_new(
875        projection_as_columns[far_left_right_col_ind as _..]
876            .iter()
877            .map(|(col, alias)| {
878                ProjectionExpr {
879                    expr: Arc::new(Column::new(
880                        col.name(),
881                        // Align projected expressions coming from the right
882                        // table with the new right child projection:
883                        (col.index() as i32 - left_size) as _,
884                    )) as _,
885                    alias: alias.clone(),
886                }
887            }),
888        Arc::clone(right_child),
889    )?;
890
891    Ok((new_left, new_right))
892}
893
894/// Checks three conditions for pushing a projection down through a join:
895/// - Projection must narrow the join output schema.
896/// - Columns coming from left/right tables must be collected at the left/right
897///   sides of the output table.
898/// - Left or right table is not lost after the projection.
899pub fn join_allows_pushdown(
900    projection_as_columns: &[(Column, String)],
901    join_schema: &SchemaRef,
902    far_right_left_col_ind: i32,
903    far_left_right_col_ind: i32,
904) -> bool {
905    // Projection must narrow the join output:
906    projection_as_columns.len() < join_schema.fields().len()
907    // Are the columns from different tables mixed?
908    && (far_right_left_col_ind + 1 == far_left_right_col_ind)
909    // Left or right table is not lost after the projection.
910    && far_right_left_col_ind >= 0
911    && far_left_right_col_ind < projection_as_columns.len() as i32
912}
913
914/// Returns the last index before encountering a column coming from the right table when traveling
915/// through the projection from left to right, and the last index before encountering a column
916/// coming from the left table when traveling through the projection from right to left.
917/// If there is no column in the projection coming from the left side, it returns (-1, ...),
918/// if there is no column in the projection coming from the right side, it returns (..., projection length).
919pub fn join_table_borders(
920    left_table_column_count: usize,
921    projection_as_columns: &[(Column, String)],
922) -> (i32, i32) {
923    let far_right_left_col_ind = projection_as_columns
924        .iter()
925        .enumerate()
926        .take_while(|(_, (projection_column, _))| {
927            projection_column.index() < left_table_column_count
928        })
929        .last()
930        .map(|(index, _)| index as i32)
931        .unwrap_or(-1);
932
933    let far_left_right_col_ind = projection_as_columns
934        .iter()
935        .enumerate()
936        .rev()
937        .take_while(|(_, (projection_column, _))| {
938            projection_column.index() >= left_table_column_count
939        })
940        .last()
941        .map(|(index, _)| index as i32)
942        .unwrap_or(projection_as_columns.len() as i32);
943
944    (far_right_left_col_ind, far_left_right_col_ind)
945}
946
947/// Tries to update the equi-join `Column`'s of a join as if the input of
948/// the join was replaced by a projection.
949pub fn update_join_on(
950    proj_left_exprs: &[(Column, String)],
951    proj_right_exprs: &[(Column, String)],
952    hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
953    left_field_size: usize,
954) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
955    let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
956        .iter()
957        .map(|(left, right)| (left, right))
958        .unzip();
959
960    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
961    let new_right_columns =
962        new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
963
964    match (new_left_columns, new_right_columns) {
965        (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
966        _ => None,
967    }
968}
969
970/// Tries to update the column indices of a [`JoinFilter`] as if the input of
971/// the join was replaced by a projection.
972pub fn update_join_filter(
973    projection_left_exprs: &[(Column, String)],
974    projection_right_exprs: &[(Column, String)],
975    join_filter: &JoinFilter,
976    left_field_size: usize,
977) -> Option<JoinFilter> {
978    let mut new_left_indices = new_indices_for_join_filter(
979        join_filter,
980        JoinSide::Left,
981        projection_left_exprs,
982        0,
983    )
984    .into_iter();
985    let mut new_right_indices = new_indices_for_join_filter(
986        join_filter,
987        JoinSide::Right,
988        projection_right_exprs,
989        left_field_size,
990    )
991    .into_iter();
992
993    // Check if all columns match:
994    (new_right_indices.len() + new_left_indices.len()
995        == join_filter.column_indices().len())
996    .then(|| {
997        JoinFilter::new(
998            Arc::clone(join_filter.expression()),
999            join_filter
1000                .column_indices()
1001                .iter()
1002                .map(|col_idx| ColumnIndex {
1003                    index: if col_idx.side == JoinSide::Left {
1004                        new_left_indices.next().unwrap()
1005                    } else {
1006                        new_right_indices.next().unwrap()
1007                    },
1008                    side: col_idx.side,
1009                })
1010                .collect(),
1011            Arc::clone(join_filter.schema()),
1012        )
1013    })
1014}
1015
1016/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
1017fn try_unifying_projections(
1018    projection: &ProjectionExec,
1019    child: &ProjectionExec,
1020) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1021    let mut projected_exprs = vec![];
1022    let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1023
1024    // Collect the column references usage in the outer projection.
1025    projection.expr().iter().for_each(|proj_expr| {
1026        proj_expr
1027            .expr
1028            .apply(|expr| {
1029                Ok({
1030                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1031                        *column_ref_map.entry(column.clone()).or_default() += 1;
1032                    }
1033                    TreeNodeRecursion::Continue
1034                })
1035            })
1036            .unwrap();
1037    });
1038    // Merging these projections is not beneficial, e.g
1039    // If an expression is not trivial (KeepInPlace) and it is referred more than 1, unifies projections will be
1040    // beneficial as caching mechanism for non-trivial computations.
1041    // See discussion in: https://github.com/apache/datafusion/issues/8296
1042    if column_ref_map.iter().any(|(column, count)| {
1043        *count > 1
1044            && !child.expr()[column.index()]
1045                .expr
1046                .placement()
1047                .should_push_to_leaves()
1048    }) {
1049        return Ok(None);
1050    }
1051    for proj_expr in projection.expr() {
1052        // If there is no match in the input projection, we cannot unify these
1053        // projections. This case will arise if the projection expression contains
1054        // a `PhysicalExpr` variant `update_expr` doesn't support.
1055        let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1056            return Ok(None);
1057        };
1058        projected_exprs.push(ProjectionExpr {
1059            expr,
1060            alias: proj_expr.alias.clone(),
1061        });
1062    }
1063    ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1064        .map(|e| Some(Arc::new(e) as _))
1065}
1066
1067/// Collect all column indices from the given projection expressions.
1068fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1069    // Collect indices and remove duplicates.
1070    let mut indices = exprs
1071        .iter()
1072        .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
1073        .map(|x| x.index())
1074        .collect::<std::collections::HashSet<_>>()
1075        .into_iter()
1076        .collect::<Vec<_>>();
1077    indices.sort();
1078    indices
1079}
1080
1081/// This function determines and returns a vector of indices representing the
1082/// positions of columns in `projection_exprs` that are involved in `join_filter`,
1083/// and correspond to a particular side (`join_side`) of the join operation.
1084///
1085/// Notes: Column indices in the projection expressions are based on the join schema,
1086/// whereas the join filter is based on the join child schema. `column_index_offset`
1087/// represents the offset between them.
1088fn new_indices_for_join_filter(
1089    join_filter: &JoinFilter,
1090    join_side: JoinSide,
1091    projection_exprs: &[(Column, String)],
1092    column_index_offset: usize,
1093) -> Vec<usize> {
1094    join_filter
1095        .column_indices()
1096        .iter()
1097        .filter(|col_idx| col_idx.side == join_side)
1098        .filter_map(|col_idx| {
1099            projection_exprs
1100                .iter()
1101                .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1102        })
1103        .collect()
1104}
1105
1106/// This function generates a new set of columns to be used in a hash join
1107/// operation based on a set of equi-join conditions (`hash_join_on`) and a
1108/// list of projection expressions (`projection_exprs`).
1109///
1110/// Notes: Column indices in the projection expressions are based on the join schema,
1111/// whereas the join on expressions are based on the join child schema. `column_index_offset`
1112/// represents the offset between them.
1113fn new_columns_for_join_on(
1114    hash_join_on: &[&PhysicalExprRef],
1115    projection_exprs: &[(Column, String)],
1116    column_index_offset: usize,
1117) -> Option<Vec<PhysicalExprRef>> {
1118    let new_columns = hash_join_on
1119        .iter()
1120        .filter_map(|on| {
1121            // Rewrite all columns in `on`
1122            Arc::clone(*on)
1123                .transform(|expr| {
1124                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1125                        // Find the column in the projection expressions
1126                        let new_column = projection_exprs
1127                            .iter()
1128                            .enumerate()
1129                            .find(|(_, (proj_column, _))| {
1130                                column.name() == proj_column.name()
1131                                    && column.index() + column_index_offset
1132                                        == proj_column.index()
1133                            })
1134                            .map(|(index, (_, alias))| Column::new(alias, index));
1135                        if let Some(new_column) = new_column {
1136                            Ok(Transformed::yes(Arc::new(new_column)))
1137                        } else {
1138                            // If the column is not found in the projection expressions,
1139                            // it means that the column is not projected. In this case,
1140                            // we cannot push the projection down.
1141                            internal_err!(
1142                                "Column {:?} not found in projection expressions",
1143                                column
1144                            )
1145                        }
1146                    } else {
1147                        Ok(Transformed::no(expr))
1148                    }
1149                })
1150                .data()
1151                .ok()
1152        })
1153        .collect::<Vec<_>>();
1154    (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159    use super::*;
1160    use std::sync::Arc;
1161
1162    use crate::common::collect;
1163
1164    use crate::filter_pushdown::PushedDown;
1165    use crate::test;
1166    use crate::test::exec::StatisticsExec;
1167
1168    use arrow::datatypes::{DataType, Field, Schema};
1169    use datafusion_common::ScalarValue;
1170    use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1171
1172    use datafusion_expr::Operator;
1173    use datafusion_physical_expr::expressions::{
1174        BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit,
1175    };
1176
1177    #[test]
1178    fn test_collect_column_indices() -> Result<()> {
1179        let expr = Arc::new(BinaryExpr::new(
1180            Arc::new(Column::new("b", 7)),
1181            Operator::Minus,
1182            Arc::new(BinaryExpr::new(
1183                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1184                Operator::Plus,
1185                Arc::new(Column::new("a", 1)),
1186            )),
1187        ));
1188        let column_indices = collect_column_indices(&[ProjectionExpr {
1189            expr,
1190            alias: "b-(1+a)".to_string(),
1191        }]);
1192        assert_eq!(column_indices, vec![1, 7]);
1193        Ok(())
1194    }
1195
1196    #[test]
1197    fn test_join_table_borders() -> Result<()> {
1198        let projections = vec![
1199            (Column::new("b", 1), "b".to_owned()),
1200            (Column::new("c", 2), "c".to_owned()),
1201            (Column::new("e", 4), "e".to_owned()),
1202            (Column::new("d", 3), "d".to_owned()),
1203            (Column::new("c", 2), "c".to_owned()),
1204            (Column::new("f", 5), "f".to_owned()),
1205            (Column::new("h", 7), "h".to_owned()),
1206            (Column::new("g", 6), "g".to_owned()),
1207        ];
1208        let left_table_column_count = 5;
1209        assert_eq!(
1210            join_table_borders(left_table_column_count, &projections),
1211            (4, 5)
1212        );
1213
1214        let left_table_column_count = 8;
1215        assert_eq!(
1216            join_table_borders(left_table_column_count, &projections),
1217            (7, 8)
1218        );
1219
1220        let left_table_column_count = 1;
1221        assert_eq!(
1222            join_table_borders(left_table_column_count, &projections),
1223            (-1, 0)
1224        );
1225
1226        let projections = vec![
1227            (Column::new("a", 0), "a".to_owned()),
1228            (Column::new("b", 1), "b".to_owned()),
1229            (Column::new("d", 3), "d".to_owned()),
1230            (Column::new("g", 6), "g".to_owned()),
1231            (Column::new("e", 4), "e".to_owned()),
1232            (Column::new("f", 5), "f".to_owned()),
1233            (Column::new("e", 4), "e".to_owned()),
1234            (Column::new("h", 7), "h".to_owned()),
1235        ];
1236        let left_table_column_count = 5;
1237        assert_eq!(
1238            join_table_borders(left_table_column_count, &projections),
1239            (2, 7)
1240        );
1241
1242        let left_table_column_count = 7;
1243        assert_eq!(
1244            join_table_borders(left_table_column_count, &projections),
1245            (6, 7)
1246        );
1247
1248        Ok(())
1249    }
1250
1251    #[tokio::test]
1252    async fn project_no_column() -> Result<()> {
1253        let task_ctx = Arc::new(TaskContext::default());
1254
1255        let exec = test::scan_partitioned(1);
1256        let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1257
1258        let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1259        let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1260        let output = collect(stream).await?;
1261        assert_eq!(output.len(), expected.len());
1262
1263        Ok(())
1264    }
1265
1266    #[tokio::test]
1267    async fn project_old_syntax() {
1268        let exec = test::scan_partitioned(1);
1269        let schema = exec.schema();
1270        let expr = col("i", &schema).unwrap();
1271        ProjectionExec::try_new(
1272            vec![
1273                // use From impl of ProjectionExpr to create ProjectionExpr
1274                // to test old syntax
1275                (expr, "c".to_string()),
1276            ],
1277            exec,
1278        )
1279        // expect this to succeed
1280        .unwrap();
1281    }
1282
1283    #[test]
1284    fn test_projection_statistics_uses_input_schema() {
1285        let input_schema = Schema::new(vec![
1286            Field::new("a", DataType::Int32, false),
1287            Field::new("b", DataType::Int32, false),
1288            Field::new("c", DataType::Int32, false),
1289            Field::new("d", DataType::Int32, false),
1290            Field::new("e", DataType::Int32, false),
1291            Field::new("f", DataType::Int32, false),
1292        ]);
1293
1294        let input_statistics = Statistics {
1295            num_rows: Precision::Exact(10),
1296            column_statistics: vec![
1297                ColumnStatistics {
1298                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1299                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1300                    ..Default::default()
1301                },
1302                ColumnStatistics {
1303                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1304                    max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1305                    ..Default::default()
1306                },
1307                ColumnStatistics {
1308                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1309                    max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1310                    ..Default::default()
1311                },
1312                ColumnStatistics {
1313                    min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1314                    max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1315                    ..Default::default()
1316                },
1317                ColumnStatistics {
1318                    min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1319                    max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1320                    ..Default::default()
1321                },
1322                ColumnStatistics {
1323                    min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1324                    max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1325                    ..Default::default()
1326                },
1327            ],
1328            ..Default::default()
1329        };
1330
1331        let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1332
1333        // Create projection expressions that reference columns from the input schema and the length
1334        // of output schema columns < input schema columns and hence if we use the last few columns
1335        // from the input schema in the expressions here, bounds_check would fail on them if output
1336        // schema is supplied to the partitions_statistics method.
1337        let exprs: Vec<ProjectionExpr> = vec![
1338            ProjectionExpr {
1339                expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1340                alias: "c_renamed".to_string(),
1341            },
1342            ProjectionExpr {
1343                expr: Arc::new(BinaryExpr::new(
1344                    Arc::new(Column::new("e", 4)),
1345                    Operator::Plus,
1346                    Arc::new(Column::new("f", 5)),
1347                )) as Arc<dyn PhysicalExpr>,
1348                alias: "e_plus_f".to_string(),
1349            },
1350        ];
1351
1352        let projection = ProjectionExec::try_new(exprs, input).unwrap();
1353
1354        let stats = projection.partition_statistics(None).unwrap();
1355
1356        assert_eq!(stats.num_rows, Precision::Exact(10));
1357        assert_eq!(
1358            stats.column_statistics.len(),
1359            2,
1360            "Expected 2 columns in projection statistics"
1361        );
1362        assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1363    }
1364
1365    #[test]
1366    fn test_filter_pushdown_with_alias() -> Result<()> {
1367        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1368        let input = Arc::new(StatisticsExec::new(
1369            Statistics::new_unknown(&input_schema),
1370            input_schema.clone(),
1371        ));
1372
1373        // project "a" as "b"
1374        let projection = ProjectionExec::try_new(
1375            vec![ProjectionExpr {
1376                expr: Arc::new(Column::new("a", 0)),
1377                alias: "b".to_string(),
1378            }],
1379            input,
1380        )?;
1381
1382        // filter "b > 5"
1383        let filter = Arc::new(BinaryExpr::new(
1384            Arc::new(Column::new("b", 0)),
1385            Operator::Gt,
1386            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1387        )) as Arc<dyn PhysicalExpr>;
1388
1389        let description = projection.gather_filters_for_pushdown(
1390            FilterPushdownPhase::Post,
1391            vec![filter],
1392            &ConfigOptions::default(),
1393        )?;
1394
1395        // Should be converted to "a > 5"
1396        // "a" is index 0 in input
1397        let expected_filter = Arc::new(BinaryExpr::new(
1398            Arc::new(Column::new("a", 0)),
1399            Operator::Gt,
1400            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1401        )) as Arc<dyn PhysicalExpr>;
1402
1403        assert_eq!(description.self_filters(), vec![vec![]]);
1404        let pushed_filters = &description.parent_filters()[0];
1405        assert_eq!(
1406            format!("{}", pushed_filters[0].predicate),
1407            format!("{}", expected_filter)
1408        );
1409        // Verify the predicate was actually pushed down
1410        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1411
1412        Ok(())
1413    }
1414
1415    #[test]
1416    fn test_filter_pushdown_with_multiple_aliases() -> Result<()> {
1417        let input_schema = Schema::new(vec![
1418            Field::new("a", DataType::Int32, false),
1419            Field::new("b", DataType::Int32, false),
1420        ]);
1421        let input = Arc::new(StatisticsExec::new(
1422            Statistics {
1423                column_statistics: vec![Default::default(); input_schema.fields().len()],
1424                ..Default::default()
1425            },
1426            input_schema.clone(),
1427        ));
1428
1429        // project "a" as "x", "b" as "y"
1430        let projection = ProjectionExec::try_new(
1431            vec![
1432                ProjectionExpr {
1433                    expr: Arc::new(Column::new("a", 0)),
1434                    alias: "x".to_string(),
1435                },
1436                ProjectionExpr {
1437                    expr: Arc::new(Column::new("b", 1)),
1438                    alias: "y".to_string(),
1439                },
1440            ],
1441            input,
1442        )?;
1443
1444        // filter "x > 5"
1445        let filter1 = Arc::new(BinaryExpr::new(
1446            Arc::new(Column::new("x", 0)),
1447            Operator::Gt,
1448            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1449        )) as Arc<dyn PhysicalExpr>;
1450
1451        // filter "y < 10"
1452        let filter2 = Arc::new(BinaryExpr::new(
1453            Arc::new(Column::new("y", 1)),
1454            Operator::Lt,
1455            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1456        )) as Arc<dyn PhysicalExpr>;
1457
1458        let description = projection.gather_filters_for_pushdown(
1459            FilterPushdownPhase::Post,
1460            vec![filter1, filter2],
1461            &ConfigOptions::default(),
1462        )?;
1463
1464        // Should be converted to "a > 5" and "b < 10"
1465        let expected_filter1 = Arc::new(BinaryExpr::new(
1466            Arc::new(Column::new("a", 0)),
1467            Operator::Gt,
1468            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1469        )) as Arc<dyn PhysicalExpr>;
1470
1471        let expected_filter2 = Arc::new(BinaryExpr::new(
1472            Arc::new(Column::new("b", 1)),
1473            Operator::Lt,
1474            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1475        )) as Arc<dyn PhysicalExpr>;
1476
1477        let pushed_filters = &description.parent_filters()[0];
1478        assert_eq!(pushed_filters.len(), 2);
1479        // Note: The order of filters is preserved
1480        assert_eq!(
1481            format!("{}", pushed_filters[0].predicate),
1482            format!("{}", expected_filter1)
1483        );
1484        assert_eq!(
1485            format!("{}", pushed_filters[1].predicate),
1486            format!("{}", expected_filter2)
1487        );
1488        // Verify the predicates were actually pushed down
1489        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1490        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1491
1492        Ok(())
1493    }
1494
1495    #[test]
1496    fn test_filter_pushdown_with_swapped_aliases() -> Result<()> {
1497        let input_schema = Schema::new(vec![
1498            Field::new("a", DataType::Int32, false),
1499            Field::new("b", DataType::Int32, false),
1500        ]);
1501        let input = Arc::new(StatisticsExec::new(
1502            Statistics {
1503                column_statistics: vec![Default::default(); input_schema.fields().len()],
1504                ..Default::default()
1505            },
1506            input_schema.clone(),
1507        ));
1508
1509        // project "a" as "b", "b" as "a"
1510        let projection = ProjectionExec::try_new(
1511            vec![
1512                ProjectionExpr {
1513                    expr: Arc::new(Column::new("a", 0)),
1514                    alias: "b".to_string(),
1515                },
1516                ProjectionExpr {
1517                    expr: Arc::new(Column::new("b", 1)),
1518                    alias: "a".to_string(),
1519                },
1520            ],
1521            input,
1522        )?;
1523
1524        // filter "b > 5" (output column 0, which is "a" in input)
1525        let filter1 = Arc::new(BinaryExpr::new(
1526            Arc::new(Column::new("b", 0)),
1527            Operator::Gt,
1528            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1529        )) as Arc<dyn PhysicalExpr>;
1530
1531        // filter "a < 10" (output column 1, which is "b" in input)
1532        let filter2 = Arc::new(BinaryExpr::new(
1533            Arc::new(Column::new("a", 1)),
1534            Operator::Lt,
1535            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1536        )) as Arc<dyn PhysicalExpr>;
1537
1538        let description = projection.gather_filters_for_pushdown(
1539            FilterPushdownPhase::Post,
1540            vec![filter1, filter2],
1541            &ConfigOptions::default(),
1542        )?;
1543
1544        let pushed_filters = &description.parent_filters()[0];
1545        assert_eq!(pushed_filters.len(), 2);
1546
1547        // "b" (output index 0) -> "a" (input index 0)
1548        let expected_filter1 = "a@0 > 5";
1549        // "a" (output index 1) -> "b" (input index 1)
1550        let expected_filter2 = "b@1 < 10";
1551
1552        assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1553        assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1554        // Verify the predicates were actually pushed down
1555        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1556        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1557
1558        Ok(())
1559    }
1560
1561    #[test]
1562    fn test_filter_pushdown_with_mixed_columns() -> Result<()> {
1563        let input_schema = Schema::new(vec![
1564            Field::new("a", DataType::Int32, false),
1565            Field::new("b", DataType::Int32, false),
1566        ]);
1567        let input = Arc::new(StatisticsExec::new(
1568            Statistics {
1569                column_statistics: vec![Default::default(); input_schema.fields().len()],
1570                ..Default::default()
1571            },
1572            input_schema.clone(),
1573        ));
1574
1575        // project "a" as "x", "b" as "b" (pass through)
1576        let projection = ProjectionExec::try_new(
1577            vec![
1578                ProjectionExpr {
1579                    expr: Arc::new(Column::new("a", 0)),
1580                    alias: "x".to_string(),
1581                },
1582                ProjectionExpr {
1583                    expr: Arc::new(Column::new("b", 1)),
1584                    alias: "b".to_string(),
1585                },
1586            ],
1587            input,
1588        )?;
1589
1590        // filter "x > 5"
1591        let filter1 = Arc::new(BinaryExpr::new(
1592            Arc::new(Column::new("x", 0)),
1593            Operator::Gt,
1594            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1595        )) as Arc<dyn PhysicalExpr>;
1596
1597        // filter "b < 10" (using output index 1 which corresponds to 'b')
1598        let filter2 = Arc::new(BinaryExpr::new(
1599            Arc::new(Column::new("b", 1)),
1600            Operator::Lt,
1601            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1602        )) as Arc<dyn PhysicalExpr>;
1603
1604        let description = projection.gather_filters_for_pushdown(
1605            FilterPushdownPhase::Post,
1606            vec![filter1, filter2],
1607            &ConfigOptions::default(),
1608        )?;
1609
1610        let pushed_filters = &description.parent_filters()[0];
1611        assert_eq!(pushed_filters.len(), 2);
1612        // "x" -> "a" (index 0)
1613        let expected_filter1 = "a@0 > 5";
1614        // "b" -> "b" (index 1)
1615        let expected_filter2 = "b@1 < 10";
1616
1617        assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1618        assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1619        // Verify the predicates were actually pushed down
1620        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1621        assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1622
1623        Ok(())
1624    }
1625
1626    #[test]
1627    fn test_filter_pushdown_with_complex_expression() -> Result<()> {
1628        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1629        let input = Arc::new(StatisticsExec::new(
1630            Statistics {
1631                column_statistics: vec![Default::default(); input_schema.fields().len()],
1632                ..Default::default()
1633            },
1634            input_schema.clone(),
1635        ));
1636
1637        // project "a + 1" as "z"
1638        let projection = ProjectionExec::try_new(
1639            vec![ProjectionExpr {
1640                expr: Arc::new(BinaryExpr::new(
1641                    Arc::new(Column::new("a", 0)),
1642                    Operator::Plus,
1643                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1644                )),
1645                alias: "z".to_string(),
1646            }],
1647            input,
1648        )?;
1649
1650        // filter "z > 10"
1651        let filter = Arc::new(BinaryExpr::new(
1652            Arc::new(Column::new("z", 0)),
1653            Operator::Gt,
1654            Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1655        )) as Arc<dyn PhysicalExpr>;
1656
1657        let description = projection.gather_filters_for_pushdown(
1658            FilterPushdownPhase::Post,
1659            vec![filter],
1660            &ConfigOptions::default(),
1661        )?;
1662
1663        // expand to `a + 1 > 10`
1664        let pushed_filters = &description.parent_filters()[0];
1665        assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1666        assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10");
1667
1668        Ok(())
1669    }
1670
1671    #[test]
1672    fn test_filter_pushdown_with_unknown_column() -> Result<()> {
1673        let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1674        let input = Arc::new(StatisticsExec::new(
1675            Statistics {
1676                column_statistics: vec![Default::default(); input_schema.fields().len()],
1677                ..Default::default()
1678            },
1679            input_schema.clone(),
1680        ));
1681
1682        // project "a" as "a"
1683        let projection = ProjectionExec::try_new(
1684            vec![ProjectionExpr {
1685                expr: Arc::new(Column::new("a", 0)),
1686                alias: "a".to_string(),
1687            }],
1688            input,
1689        )?;
1690
1691        // filter "unknown_col > 5" - using a column name that doesn't exist in projection output
1692        // Column constructor: name, index. Index 1 doesn't exist.
1693        let filter = Arc::new(BinaryExpr::new(
1694            Arc::new(Column::new("unknown_col", 1)),
1695            Operator::Gt,
1696            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1697        )) as Arc<dyn PhysicalExpr>;
1698
1699        let description = projection.gather_filters_for_pushdown(
1700            FilterPushdownPhase::Post,
1701            vec![filter],
1702            &ConfigOptions::default(),
1703        )?;
1704
1705        let pushed_filters = &description.parent_filters()[0];
1706        assert!(matches!(pushed_filters[0].discriminant, PushedDown::No));
1707        // The column shouldn't be found in the alias map, so it remains unchanged with its index
1708        assert_eq!(
1709            format!("{}", pushed_filters[0].predicate),
1710            "unknown_col@1 > 5"
1711        );
1712
1713        Ok(())
1714    }
1715
1716    /// Basic test for `DynamicFilterPhysicalExpr` can correctly update its child expression
1717    /// i.e. starting with lit(true) and after update it becomes `a > 5`
1718    /// with projection [b - 1 as a], the pushed down filter should be `b - 1 > 5`
1719    #[test]
1720    fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> {
1721        let input_schema =
1722            Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
1723
1724        let input = Arc::new(StatisticsExec::new(
1725            Statistics {
1726                column_statistics: vec![Default::default(); input_schema.fields().len()],
1727                ..Default::default()
1728            },
1729            input_schema.as_ref().clone(),
1730        ));
1731
1732        // project "b" - 1 as "a"
1733        let projection = ProjectionExec::try_new(
1734            vec![ProjectionExpr {
1735                expr: binary(
1736                    Arc::new(Column::new("b", 0)),
1737                    Operator::Minus,
1738                    lit(1),
1739                    &input_schema,
1740                )
1741                .unwrap(),
1742                alias: "a".to_string(),
1743            }],
1744            input,
1745        )?;
1746
1747        // simulate projection's parent create a dynamic filter on "a"
1748        let projected_schema = projection.schema();
1749        let col_a = col("a", &projected_schema)?;
1750        let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1751            vec![Arc::clone(&col_a)],
1752            lit(true),
1753        ));
1754        // Initial state should be lit(true)
1755        let current = dynamic_filter.current()?;
1756        assert_eq!(format!("{current}"), "true");
1757
1758        let dyn_phy_expr: Arc<dyn PhysicalExpr> = Arc::clone(&dynamic_filter) as _;
1759
1760        let description = projection.gather_filters_for_pushdown(
1761            FilterPushdownPhase::Post,
1762            vec![dyn_phy_expr],
1763            &ConfigOptions::default(),
1764        )?;
1765
1766        let pushed_filters = &description.parent_filters()[0][0];
1767
1768        // Check currently pushed_filters is lit(true)
1769        assert_eq!(
1770            format!("{}", pushed_filters.predicate),
1771            "DynamicFilter [ empty ]"
1772        );
1773
1774        // Update to a > 5 (after projection, b is now called a)
1775        let new_expr =
1776            Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32)));
1777        dynamic_filter.update(new_expr)?;
1778
1779        // Now it should be a > 5
1780        let current = dynamic_filter.current()?;
1781        assert_eq!(format!("{current}"), "a@0 > 5");
1782
1783        // Check currently pushed_filters is b - 1 > 5 (because b - 1 is projected as a)
1784        assert_eq!(
1785            format!("{}", pushed_filters.predicate),
1786            "DynamicFilter [ b@0 - 1 > 5 ]"
1787        );
1788
1789        Ok(())
1790    }
1791}