datafusion_physical_plan/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the projection execution plan. A projection determines which columns or expressions
19//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
20//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
21//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
22
23use super::expressions::{Column, Literal};
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27    SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::execution_plan::CardinalityEffect;
30use crate::filter_pushdown::{
31    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32    FilterPushdownPropagation,
33};
34use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
35use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
36use std::any::Any;
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::RecordBatch;
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{
46    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
47};
48use datafusion_common::{JoinSide, Result, internal_err};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::equivalence::ProjectionMapping;
51use datafusion_physical_expr::projection::Projector;
52use datafusion_physical_expr::utils::collect_columns;
53use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
54use datafusion_physical_expr_common::sort_expr::{
55    LexOrdering, LexRequirement, PhysicalSortExpr,
56};
57// Re-exported from datafusion-physical-expr for backwards compatibility
58// We recommend updating your imports to use datafusion-physical-expr directly
59pub use datafusion_physical_expr::projection::{
60    ProjectionExpr, ProjectionExprs, update_expr,
61};
62
63use futures::stream::{Stream, StreamExt};
64use log::trace;
65
66/// [`ExecutionPlan`] for a projection
67///
68/// Computes a set of scalar value expressions for each input row, producing one
69/// output row for each input row.
70#[derive(Debug, Clone)]
71pub struct ProjectionExec {
72    /// A projector specialized to apply the projection to the input schema from the child node
73    /// and produce [`RecordBatch`]es with the output schema of this node.
74    projector: Projector,
75    /// The input plan
76    input: Arc<dyn ExecutionPlan>,
77    /// Execution metrics
78    metrics: ExecutionPlanMetricsSet,
79    /// Cache holding plan properties like equivalences, output partitioning etc.
80    cache: PlanProperties,
81}
82
83impl ProjectionExec {
84    /// Create a projection on an input
85    ///
86    /// # Example:
87    /// Create a `ProjectionExec` to crate `SELECT a, a+b AS sum_ab FROM t1`:
88    ///
89    /// ```
90    /// # use std::sync::Arc;
91    /// # use arrow_schema::{Schema, Field, DataType};
92    /// # use datafusion_expr::Operator;
93    /// # use datafusion_physical_plan::ExecutionPlan;
94    /// # use datafusion_physical_expr::expressions::{col, binary};
95    /// # use datafusion_physical_plan::empty::EmptyExec;
96    /// # use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
97    /// # fn schema() -> Arc<Schema> {
98    /// #  Arc::new(Schema::new(vec![
99    /// #   Field::new("a", DataType::Int32, false),
100    /// #   Field::new("b", DataType::Int32, false),
101    /// # ]))
102    /// # }
103    /// #
104    /// # fn input() -> Arc<dyn ExecutionPlan> {
105    /// #  Arc::new(EmptyExec::new(schema()))
106    /// # }
107    /// #
108    /// # fn main() {
109    /// let schema = schema();
110    /// // Create PhysicalExprs
111    /// let a = col("a", &schema).unwrap();
112    /// let b = col("b", &schema).unwrap();
113    /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, &schema).unwrap();
114    /// // create ProjectionExec
115    /// let proj = ProjectionExec::try_new(
116    ///     [
117    ///         ProjectionExpr {
118    ///             // expr a produces the column named "a"
119    ///             expr: a,
120    ///             alias: "a".to_string(),
121    ///         },
122    ///         ProjectionExpr {
123    ///             // expr: a + b produces the column named "sum_ab"
124    ///             expr: a_plus_b,
125    ///             alias: "sum_ab".to_string(),
126    ///         },
127    ///     ],
128    ///     input(),
129    /// )
130    /// .unwrap();
131    /// # }
132    /// ```
133    pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
134    where
135        I: IntoIterator<Item = E>,
136        E: Into<ProjectionExpr>,
137    {
138        let input_schema = input.schema();
139        // convert argument to Vec<ProjectionExpr>
140        let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
141        let projection = ProjectionExprs::new(expr_vec);
142        let projector = projection.make_projector(&input_schema)?;
143
144        // Construct a map from the input expressions to the output expression of the Projection
145        let projection_mapping = projection.projection_mapping(&input_schema)?;
146        let cache = Self::compute_properties(
147            &input,
148            &projection_mapping,
149            Arc::clone(projector.output_schema()),
150        )?;
151        Ok(Self {
152            projector,
153            input,
154            metrics: ExecutionPlanMetricsSet::new(),
155            cache,
156        })
157    }
158
159    /// The projection expressions stored as tuples of (expression, output column name)
160    pub fn expr(&self) -> &[ProjectionExpr] {
161        self.projector.projection().as_ref()
162    }
163
164    /// The projection expressions as a [`ProjectionExprs`].
165    pub fn projection_expr(&self) -> &ProjectionExprs {
166        self.projector.projection()
167    }
168
169    /// The input plan
170    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
171        &self.input
172    }
173
174    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
175    fn compute_properties(
176        input: &Arc<dyn ExecutionPlan>,
177        projection_mapping: &ProjectionMapping,
178        schema: SchemaRef,
179    ) -> Result<PlanProperties> {
180        // Calculate equivalence properties:
181        let input_eq_properties = input.equivalence_properties();
182        let eq_properties = input_eq_properties.project(projection_mapping, schema);
183        // Calculate output partitioning, which needs to respect aliases:
184        let output_partitioning = input
185            .output_partitioning()
186            .project(projection_mapping, input_eq_properties);
187
188        Ok(PlanProperties::new(
189            eq_properties,
190            output_partitioning,
191            input.pipeline_behavior(),
192            input.boundedness(),
193        ))
194    }
195}
196
197impl DisplayAs for ProjectionExec {
198    fn fmt_as(
199        &self,
200        t: DisplayFormatType,
201        f: &mut std::fmt::Formatter,
202    ) -> std::fmt::Result {
203        match t {
204            DisplayFormatType::Default | DisplayFormatType::Verbose => {
205                let expr: Vec<String> = self
206                    .projector
207                    .projection()
208                    .as_ref()
209                    .iter()
210                    .map(|proj_expr| {
211                        let e = proj_expr.expr.to_string();
212                        if e != proj_expr.alias {
213                            format!("{e} as {}", proj_expr.alias)
214                        } else {
215                            e
216                        }
217                    })
218                    .collect();
219
220                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
221            }
222            DisplayFormatType::TreeRender => {
223                for (i, proj_expr) in self.expr().iter().enumerate() {
224                    let expr_sql = fmt_sql(proj_expr.expr.as_ref());
225                    if proj_expr.expr.to_string() == proj_expr.alias {
226                        writeln!(f, "expr{i}={expr_sql}")?;
227                    } else {
228                        writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
229                    }
230                }
231
232                Ok(())
233            }
234        }
235    }
236}
237
238impl ExecutionPlan for ProjectionExec {
239    fn name(&self) -> &'static str {
240        "ProjectionExec"
241    }
242
243    /// Return a reference to Any that can be used for downcasting
244    fn as_any(&self) -> &dyn Any {
245        self
246    }
247
248    fn properties(&self) -> &PlanProperties {
249        &self.cache
250    }
251
252    fn maintains_input_order(&self) -> Vec<bool> {
253        // Tell optimizer this operator doesn't reorder its input
254        vec![true]
255    }
256
257    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
258        let all_simple_exprs =
259            self.projector
260                .projection()
261                .as_ref()
262                .iter()
263                .all(|proj_expr| {
264                    proj_expr.expr.as_any().is::<Column>()
265                        || proj_expr.expr.as_any().is::<Literal>()
266                });
267        // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
268        // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
269        vec![!all_simple_exprs]
270    }
271
272    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
273        vec![&self.input]
274    }
275
276    fn with_new_children(
277        self: Arc<Self>,
278        mut children: Vec<Arc<dyn ExecutionPlan>>,
279    ) -> Result<Arc<dyn ExecutionPlan>> {
280        ProjectionExec::try_new(
281            self.projector.projection().clone(),
282            children.swap_remove(0),
283        )
284        .map(|p| Arc::new(p) as _)
285    }
286
287    fn execute(
288        &self,
289        partition: usize,
290        context: Arc<TaskContext>,
291    ) -> Result<SendableRecordBatchStream> {
292        trace!(
293            "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
294            partition,
295            context.session_id(),
296            context.task_id()
297        );
298
299        let projector = self.projector.with_metrics(&self.metrics, partition);
300        Ok(Box::pin(ProjectionStream::new(
301            projector,
302            self.input.execute(partition, context)?,
303            BaselineMetrics::new(&self.metrics, partition),
304        )?))
305    }
306
307    fn metrics(&self) -> Option<MetricsSet> {
308        Some(self.metrics.clone_inner())
309    }
310
311    fn statistics(&self) -> Result<Statistics> {
312        self.partition_statistics(None)
313    }
314
315    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
316        let input_stats = self.input.partition_statistics(partition)?;
317        let output_schema = self.schema();
318        self.projector
319            .projection()
320            .project_statistics(input_stats, &output_schema)
321    }
322
323    fn supports_limit_pushdown(&self) -> bool {
324        true
325    }
326
327    fn cardinality_effect(&self) -> CardinalityEffect {
328        CardinalityEffect::Equal
329    }
330
331    fn try_swapping_with_projection(
332        &self,
333        projection: &ProjectionExec,
334    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
335        let maybe_unified = try_unifying_projections(projection, self)?;
336        if let Some(new_plan) = maybe_unified {
337            // To unify 3 or more sequential projections:
338            remove_unnecessary_projections(new_plan).data().map(Some)
339        } else {
340            Ok(Some(Arc::new(projection.clone())))
341        }
342    }
343
344    fn gather_filters_for_pushdown(
345        &self,
346        _phase: FilterPushdownPhase,
347        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
348        _config: &ConfigOptions,
349    ) -> Result<FilterDescription> {
350        // TODO: In future, we can try to handle inverting aliases here.
351        // For the time being, we pass through untransformed filters, so filters on aliases are not handled.
352        // https://github.com/apache/datafusion/issues/17246
353        FilterDescription::from_children(parent_filters, &self.children())
354    }
355
356    fn handle_child_pushdown_result(
357        &self,
358        _phase: FilterPushdownPhase,
359        child_pushdown_result: ChildPushdownResult,
360        _config: &ConfigOptions,
361    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
362        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
363    }
364
365    fn try_pushdown_sort(
366        &self,
367        order: &[PhysicalSortExpr],
368    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
369        let child = self.input();
370        let mut child_order = Vec::new();
371
372        // Check and transform sort expressions
373        for sort_expr in order {
374            // Recursively transform the expression
375            let mut can_pushdown = true;
376            let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
377                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
378                    // Check if column index is valid.
379                    // This should always be true but fail gracefully if it's not.
380                    if col.index() >= self.expr().len() {
381                        can_pushdown = false;
382                        return Ok(Transformed::no(expr));
383                    }
384
385                    let proj_expr = &self.expr()[col.index()];
386
387                    // Check if projection expression is a simple column
388                    // We cannot push down order by clauses that depend on
389                    // projected computations as they would have nothing to reference.
390                    if let Some(child_col) =
391                        proj_expr.expr.as_any().downcast_ref::<Column>()
392                    {
393                        // Replace with the child column
394                        Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
395                    } else {
396                        // Projection involves computation, cannot push down
397                        can_pushdown = false;
398                        Ok(Transformed::no(expr))
399                    }
400                } else {
401                    Ok(Transformed::no(expr))
402                }
403            })?;
404
405            if !can_pushdown {
406                return Ok(SortOrderPushdownResult::Unsupported);
407            }
408
409            child_order.push(PhysicalSortExpr {
410                expr: transformed.data,
411                options: sort_expr.options,
412            });
413        }
414
415        // Recursively push down to child node
416        match child.try_pushdown_sort(&child_order)? {
417            SortOrderPushdownResult::Exact { inner } => {
418                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
419                Ok(SortOrderPushdownResult::Exact { inner: new_exec })
420            }
421            SortOrderPushdownResult::Inexact { inner } => {
422                let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
423                Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
424            }
425            SortOrderPushdownResult::Unsupported => {
426                Ok(SortOrderPushdownResult::Unsupported)
427            }
428        }
429    }
430}
431
432impl ProjectionStream {
433    /// Create a new projection stream
434    fn new(
435        projector: Projector,
436        input: SendableRecordBatchStream,
437        baseline_metrics: BaselineMetrics,
438    ) -> Result<Self> {
439        Ok(Self {
440            projector,
441            input,
442            baseline_metrics,
443        })
444    }
445
446    fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
447        // Records time on drop
448        let _timer = self.baseline_metrics.elapsed_compute().timer();
449        self.projector.project_batch(batch)
450    }
451}
452
453/// Projection iterator
454struct ProjectionStream {
455    projector: Projector,
456    input: SendableRecordBatchStream,
457    baseline_metrics: BaselineMetrics,
458}
459
460impl Stream for ProjectionStream {
461    type Item = Result<RecordBatch>;
462
463    fn poll_next(
464        mut self: Pin<&mut Self>,
465        cx: &mut Context<'_>,
466    ) -> Poll<Option<Self::Item>> {
467        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
468            Some(Ok(batch)) => Some(self.batch_project(&batch)),
469            other => other,
470        });
471
472        self.baseline_metrics.record_poll(poll)
473    }
474
475    fn size_hint(&self) -> (usize, Option<usize>) {
476        // Same number of record batches
477        self.input.size_hint()
478    }
479}
480
481impl RecordBatchStream for ProjectionStream {
482    /// Get the schema
483    fn schema(&self) -> SchemaRef {
484        Arc::clone(self.projector.output_schema())
485    }
486}
487
488pub trait EmbeddedProjection: ExecutionPlan + Sized {
489    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
490}
491
492/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later.
493/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation.
494pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
495    projection: &ProjectionExec,
496    execution_plan: &Exec,
497) -> Result<Option<Arc<dyn ExecutionPlan>>> {
498    // Collect all column indices from the given projection expressions.
499    let projection_index = collect_column_indices(projection.expr());
500
501    if projection_index.is_empty() {
502        return Ok(None);
503    };
504
505    // If the projection indices is the same as the input columns, we don't need to embed the projection to hash join.
506    // Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields.
507    if projection_index.len() == projection_index.last().unwrap() + 1
508        && projection_index.len() == execution_plan.schema().fields().len()
509    {
510        return Ok(None);
511    }
512
513    let new_execution_plan =
514        Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
515
516    // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields.
517    let embed_project_exprs = projection_index
518        .iter()
519        .zip(new_execution_plan.schema().fields())
520        .map(|(index, field)| ProjectionExpr {
521            expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
522            alias: field.name().to_owned(),
523        })
524        .collect::<Vec<_>>();
525
526    let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
527
528    for proj_expr in projection.expr() {
529        // update column index for projection expression since the input schema has been changed.
530        let Some(expr) =
531            update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
532        else {
533            return Ok(None);
534        };
535        new_projection_exprs.push(ProjectionExpr {
536            expr,
537            alias: proj_expr.alias.clone(),
538        });
539    }
540    // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection.
541    let new_projection = Arc::new(ProjectionExec::try_new(
542        new_projection_exprs,
543        Arc::clone(&new_execution_plan) as _,
544    )?);
545    if is_projection_removable(&new_projection) {
546        Ok(Some(new_execution_plan))
547    } else {
548        Ok(Some(new_projection))
549    }
550}
551
552pub struct JoinData {
553    pub projected_left_child: ProjectionExec,
554    pub projected_right_child: ProjectionExec,
555    pub join_filter: Option<JoinFilter>,
556    pub join_on: JoinOn,
557}
558
559pub fn try_pushdown_through_join(
560    projection: &ProjectionExec,
561    join_left: &Arc<dyn ExecutionPlan>,
562    join_right: &Arc<dyn ExecutionPlan>,
563    join_on: JoinOnRef,
564    schema: &SchemaRef,
565    filter: Option<&JoinFilter>,
566) -> Result<Option<JoinData>> {
567    // Convert projected expressions to columns. We can not proceed if this is not possible.
568    let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
569        return Ok(None);
570    };
571
572    let (far_right_left_col_ind, far_left_right_col_ind) =
573        join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
574
575    if !join_allows_pushdown(
576        &projection_as_columns,
577        schema,
578        far_right_left_col_ind,
579        far_left_right_col_ind,
580    ) {
581        return Ok(None);
582    }
583
584    let new_filter = if let Some(filter) = filter {
585        match update_join_filter(
586            &projection_as_columns[0..=far_right_left_col_ind as _],
587            &projection_as_columns[far_left_right_col_ind as _..],
588            filter,
589            join_left.schema().fields().len(),
590        ) {
591            Some(updated_filter) => Some(updated_filter),
592            None => return Ok(None),
593        }
594    } else {
595        None
596    };
597
598    let Some(new_on) = update_join_on(
599        &projection_as_columns[0..=far_right_left_col_ind as _],
600        &projection_as_columns[far_left_right_col_ind as _..],
601        join_on,
602        join_left.schema().fields().len(),
603    ) else {
604        return Ok(None);
605    };
606
607    let (new_left, new_right) = new_join_children(
608        &projection_as_columns,
609        far_right_left_col_ind,
610        far_left_right_col_ind,
611        join_left,
612        join_right,
613    )?;
614
615    Ok(Some(JoinData {
616        projected_left_child: new_left,
617        projected_right_child: new_right,
618        join_filter: new_filter,
619        join_on: new_on,
620    }))
621}
622
623/// This function checks if `plan` is a [`ProjectionExec`], and inspects its
624/// input(s) to test whether it can push `plan` under its input(s). This function
625/// will operate on the entire tree and may ultimately remove `plan` entirely
626/// by leveraging source providers with built-in projection capabilities.
627pub fn remove_unnecessary_projections(
628    plan: Arc<dyn ExecutionPlan>,
629) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
630    let maybe_modified =
631        if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
632            // If the projection does not cause any change on the input, we can
633            // safely remove it:
634            if is_projection_removable(projection) {
635                return Ok(Transformed::yes(Arc::clone(projection.input())));
636            }
637            // If it does, check if we can push it under its child(ren):
638            projection
639                .input()
640                .try_swapping_with_projection(projection)?
641        } else {
642            return Ok(Transformed::no(plan));
643        };
644    Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
645}
646
647/// Compare the inputs and outputs of the projection. All expressions must be
648/// columns without alias, and projection does not change the order of fields.
649/// For example, if the input schema is `a, b`, `SELECT a, b` is removable,
650/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not.
651fn is_projection_removable(projection: &ProjectionExec) -> bool {
652    let exprs = projection.expr();
653    exprs.iter().enumerate().all(|(idx, proj_expr)| {
654        let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
655            return false;
656        };
657        col.name() == proj_expr.alias && col.index() == idx
658    }) && exprs.len() == projection.input().schema().fields().len()
659}
660
661/// Given the expression set of a projection, checks if the projection causes
662/// any renaming or constructs a non-`Column` physical expression.
663pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
664    exprs.iter().all(|proj_expr| {
665        proj_expr
666            .expr
667            .as_any()
668            .downcast_ref::<Column>()
669            .map(|column| column.name() == proj_expr.alias)
670            .unwrap_or(false)
671    })
672}
673
674/// Updates a source provider's projected columns according to the given
675/// projection operator's expressions. To use this function safely, one must
676/// ensure that all expressions are `Column` expressions without aliases.
677pub fn new_projections_for_columns(
678    projection: &[ProjectionExpr],
679    source: &[usize],
680) -> Vec<usize> {
681    projection
682        .iter()
683        .filter_map(|proj_expr| {
684            proj_expr
685                .expr
686                .as_any()
687                .downcast_ref::<Column>()
688                .map(|expr| source[expr.index()])
689        })
690        .collect()
691}
692
693/// Creates a new [`ProjectionExec`] instance with the given child plan and
694/// projected expressions.
695pub fn make_with_child(
696    projection: &ProjectionExec,
697    child: &Arc<dyn ExecutionPlan>,
698) -> Result<Arc<dyn ExecutionPlan>> {
699    ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
700        .map(|e| Arc::new(e) as _)
701}
702
703/// Returns `true` if all the expressions in the argument are `Column`s.
704pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
705    exprs
706        .iter()
707        .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
708}
709
710/// Updates the given lexicographic ordering according to given projected
711/// expressions using the [`update_expr`] function.
712pub fn update_ordering(
713    ordering: LexOrdering,
714    projected_exprs: &[ProjectionExpr],
715) -> Result<Option<LexOrdering>> {
716    let mut updated_exprs = vec![];
717    for mut sort_expr in ordering.into_iter() {
718        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
719        else {
720            return Ok(None);
721        };
722        sort_expr.expr = updated_expr;
723        updated_exprs.push(sort_expr);
724    }
725    Ok(LexOrdering::new(updated_exprs))
726}
727
728/// Updates the given lexicographic requirement according to given projected
729/// expressions using the [`update_expr`] function.
730pub fn update_ordering_requirement(
731    reqs: LexRequirement,
732    projected_exprs: &[ProjectionExpr],
733) -> Result<Option<LexRequirement>> {
734    let mut updated_exprs = vec![];
735    for mut sort_expr in reqs.into_iter() {
736        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
737        else {
738            return Ok(None);
739        };
740        sort_expr.expr = updated_expr;
741        updated_exprs.push(sort_expr);
742    }
743    Ok(LexRequirement::new(updated_exprs))
744}
745
746/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given
747/// expressions is not a `Column`, returns `None`.
748pub fn physical_to_column_exprs(
749    exprs: &[ProjectionExpr],
750) -> Option<Vec<(Column, String)>> {
751    exprs
752        .iter()
753        .map(|proj_expr| {
754            proj_expr
755                .expr
756                .as_any()
757                .downcast_ref::<Column>()
758                .map(|col| (col.clone(), proj_expr.alias.clone()))
759        })
760        .collect()
761}
762
763/// If pushing down the projection over this join's children seems possible,
764/// this function constructs the new [`ProjectionExec`]s that will come on top
765/// of the original children of the join.
766pub fn new_join_children(
767    projection_as_columns: &[(Column, String)],
768    far_right_left_col_ind: i32,
769    far_left_right_col_ind: i32,
770    left_child: &Arc<dyn ExecutionPlan>,
771    right_child: &Arc<dyn ExecutionPlan>,
772) -> Result<(ProjectionExec, ProjectionExec)> {
773    let new_left = ProjectionExec::try_new(
774        projection_as_columns[0..=far_right_left_col_ind as _]
775            .iter()
776            .map(|(col, alias)| ProjectionExpr {
777                expr: Arc::new(Column::new(col.name(), col.index())) as _,
778                alias: alias.clone(),
779            }),
780        Arc::clone(left_child),
781    )?;
782    let left_size = left_child.schema().fields().len() as i32;
783    let new_right = ProjectionExec::try_new(
784        projection_as_columns[far_left_right_col_ind as _..]
785            .iter()
786            .map(|(col, alias)| {
787                ProjectionExpr {
788                    expr: Arc::new(Column::new(
789                        col.name(),
790                        // Align projected expressions coming from the right
791                        // table with the new right child projection:
792                        (col.index() as i32 - left_size) as _,
793                    )) as _,
794                    alias: alias.clone(),
795                }
796            }),
797        Arc::clone(right_child),
798    )?;
799
800    Ok((new_left, new_right))
801}
802
803/// Checks three conditions for pushing a projection down through a join:
804/// - Projection must narrow the join output schema.
805/// - Columns coming from left/right tables must be collected at the left/right
806///   sides of the output table.
807/// - Left or right table is not lost after the projection.
808pub fn join_allows_pushdown(
809    projection_as_columns: &[(Column, String)],
810    join_schema: &SchemaRef,
811    far_right_left_col_ind: i32,
812    far_left_right_col_ind: i32,
813) -> bool {
814    // Projection must narrow the join output:
815    projection_as_columns.len() < join_schema.fields().len()
816    // Are the columns from different tables mixed?
817    && (far_right_left_col_ind + 1 == far_left_right_col_ind)
818    // Left or right table is not lost after the projection.
819    && far_right_left_col_ind >= 0
820    && far_left_right_col_ind < projection_as_columns.len() as i32
821}
822
823/// Returns the last index before encountering a column coming from the right table when traveling
824/// through the projection from left to right, and the last index before encountering a column
825/// coming from the left table when traveling through the projection from right to left.
826/// If there is no column in the projection coming from the left side, it returns (-1, ...),
827/// if there is no column in the projection coming from the right side, it returns (..., projection length).
828pub fn join_table_borders(
829    left_table_column_count: usize,
830    projection_as_columns: &[(Column, String)],
831) -> (i32, i32) {
832    let far_right_left_col_ind = projection_as_columns
833        .iter()
834        .enumerate()
835        .take_while(|(_, (projection_column, _))| {
836            projection_column.index() < left_table_column_count
837        })
838        .last()
839        .map(|(index, _)| index as i32)
840        .unwrap_or(-1);
841
842    let far_left_right_col_ind = projection_as_columns
843        .iter()
844        .enumerate()
845        .rev()
846        .take_while(|(_, (projection_column, _))| {
847            projection_column.index() >= left_table_column_count
848        })
849        .last()
850        .map(|(index, _)| index as i32)
851        .unwrap_or(projection_as_columns.len() as i32);
852
853    (far_right_left_col_ind, far_left_right_col_ind)
854}
855
856/// Tries to update the equi-join `Column`'s of a join as if the input of
857/// the join was replaced by a projection.
858pub fn update_join_on(
859    proj_left_exprs: &[(Column, String)],
860    proj_right_exprs: &[(Column, String)],
861    hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
862    left_field_size: usize,
863) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
864    let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
865        .iter()
866        .map(|(left, right)| (left, right))
867        .unzip();
868
869    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
870    let new_right_columns =
871        new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
872
873    match (new_left_columns, new_right_columns) {
874        (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
875        _ => None,
876    }
877}
878
879/// Tries to update the column indices of a [`JoinFilter`] as if the input of
880/// the join was replaced by a projection.
881pub fn update_join_filter(
882    projection_left_exprs: &[(Column, String)],
883    projection_right_exprs: &[(Column, String)],
884    join_filter: &JoinFilter,
885    left_field_size: usize,
886) -> Option<JoinFilter> {
887    let mut new_left_indices = new_indices_for_join_filter(
888        join_filter,
889        JoinSide::Left,
890        projection_left_exprs,
891        0,
892    )
893    .into_iter();
894    let mut new_right_indices = new_indices_for_join_filter(
895        join_filter,
896        JoinSide::Right,
897        projection_right_exprs,
898        left_field_size,
899    )
900    .into_iter();
901
902    // Check if all columns match:
903    (new_right_indices.len() + new_left_indices.len()
904        == join_filter.column_indices().len())
905    .then(|| {
906        JoinFilter::new(
907            Arc::clone(join_filter.expression()),
908            join_filter
909                .column_indices()
910                .iter()
911                .map(|col_idx| ColumnIndex {
912                    index: if col_idx.side == JoinSide::Left {
913                        new_left_indices.next().unwrap()
914                    } else {
915                        new_right_indices.next().unwrap()
916                    },
917                    side: col_idx.side,
918                })
919                .collect(),
920            Arc::clone(join_filter.schema()),
921        )
922    })
923}
924
925/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
926fn try_unifying_projections(
927    projection: &ProjectionExec,
928    child: &ProjectionExec,
929) -> Result<Option<Arc<dyn ExecutionPlan>>> {
930    let mut projected_exprs = vec![];
931    let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
932
933    // Collect the column references usage in the outer projection.
934    projection.expr().iter().for_each(|proj_expr| {
935        proj_expr
936            .expr
937            .apply(|expr| {
938                Ok({
939                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
940                        *column_ref_map.entry(column.clone()).or_default() += 1;
941                    }
942                    TreeNodeRecursion::Continue
943                })
944            })
945            .unwrap();
946    });
947    // Merging these projections is not beneficial, e.g
948    // If an expression is not trivial and it is referred more than 1, unifies projections will be
949    // beneficial as caching mechanism for non-trivial computations.
950    // See discussion in: https://github.com/apache/datafusion/issues/8296
951    if column_ref_map.iter().any(|(column, count)| {
952        *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
953    }) {
954        return Ok(None);
955    }
956    for proj_expr in projection.expr() {
957        // If there is no match in the input projection, we cannot unify these
958        // projections. This case will arise if the projection expression contains
959        // a `PhysicalExpr` variant `update_expr` doesn't support.
960        let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
961            return Ok(None);
962        };
963        projected_exprs.push(ProjectionExpr {
964            expr,
965            alias: proj_expr.alias.clone(),
966        });
967    }
968    ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
969        .map(|e| Some(Arc::new(e) as _))
970}
971
972/// Collect all column indices from the given projection expressions.
973fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
974    // Collect indices and remove duplicates.
975    let mut indices = exprs
976        .iter()
977        .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
978        .map(|x| x.index())
979        .collect::<std::collections::HashSet<_>>()
980        .into_iter()
981        .collect::<Vec<_>>();
982    indices.sort();
983    indices
984}
985
986/// This function determines and returns a vector of indices representing the
987/// positions of columns in `projection_exprs` that are involved in `join_filter`,
988/// and correspond to a particular side (`join_side`) of the join operation.
989///
990/// Notes: Column indices in the projection expressions are based on the join schema,
991/// whereas the join filter is based on the join child schema. `column_index_offset`
992/// represents the offset between them.
993fn new_indices_for_join_filter(
994    join_filter: &JoinFilter,
995    join_side: JoinSide,
996    projection_exprs: &[(Column, String)],
997    column_index_offset: usize,
998) -> Vec<usize> {
999    join_filter
1000        .column_indices()
1001        .iter()
1002        .filter(|col_idx| col_idx.side == join_side)
1003        .filter_map(|col_idx| {
1004            projection_exprs
1005                .iter()
1006                .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1007        })
1008        .collect()
1009}
1010
1011/// This function generates a new set of columns to be used in a hash join
1012/// operation based on a set of equi-join conditions (`hash_join_on`) and a
1013/// list of projection expressions (`projection_exprs`).
1014///
1015/// Notes: Column indices in the projection expressions are based on the join schema,
1016/// whereas the join on expressions are based on the join child schema. `column_index_offset`
1017/// represents the offset between them.
1018fn new_columns_for_join_on(
1019    hash_join_on: &[&PhysicalExprRef],
1020    projection_exprs: &[(Column, String)],
1021    column_index_offset: usize,
1022) -> Option<Vec<PhysicalExprRef>> {
1023    let new_columns = hash_join_on
1024        .iter()
1025        .filter_map(|on| {
1026            // Rewrite all columns in `on`
1027            Arc::clone(*on)
1028                .transform(|expr| {
1029                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1030                        // Find the column in the projection expressions
1031                        let new_column = projection_exprs
1032                            .iter()
1033                            .enumerate()
1034                            .find(|(_, (proj_column, _))| {
1035                                column.name() == proj_column.name()
1036                                    && column.index() + column_index_offset
1037                                        == proj_column.index()
1038                            })
1039                            .map(|(index, (_, alias))| Column::new(alias, index));
1040                        if let Some(new_column) = new_column {
1041                            Ok(Transformed::yes(Arc::new(new_column)))
1042                        } else {
1043                            // If the column is not found in the projection expressions,
1044                            // it means that the column is not projected. In this case,
1045                            // we cannot push the projection down.
1046                            internal_err!(
1047                                "Column {:?} not found in projection expressions",
1048                                column
1049                            )
1050                        }
1051                    } else {
1052                        Ok(Transformed::no(expr))
1053                    }
1054                })
1055                .data()
1056                .ok()
1057        })
1058        .collect::<Vec<_>>();
1059    (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1060}
1061
1062/// Checks if the given expression is trivial.
1063/// An expression is considered trivial if it is either a `Column` or a `Literal`.
1064fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
1065    expr.as_any().downcast_ref::<Column>().is_some()
1066        || expr.as_any().downcast_ref::<Literal>().is_some()
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072    use std::sync::Arc;
1073
1074    use crate::common::collect;
1075
1076    use crate::test;
1077    use crate::test::exec::StatisticsExec;
1078
1079    use arrow::datatypes::{DataType, Field, Schema};
1080    use datafusion_common::ScalarValue;
1081    use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1082
1083    use datafusion_expr::Operator;
1084    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, col};
1085
1086    #[test]
1087    fn test_collect_column_indices() -> Result<()> {
1088        let expr = Arc::new(BinaryExpr::new(
1089            Arc::new(Column::new("b", 7)),
1090            Operator::Minus,
1091            Arc::new(BinaryExpr::new(
1092                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1093                Operator::Plus,
1094                Arc::new(Column::new("a", 1)),
1095            )),
1096        ));
1097        let column_indices = collect_column_indices(&[ProjectionExpr {
1098            expr,
1099            alias: "b-(1+a)".to_string(),
1100        }]);
1101        assert_eq!(column_indices, vec![1, 7]);
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn test_join_table_borders() -> Result<()> {
1107        let projections = vec![
1108            (Column::new("b", 1), "b".to_owned()),
1109            (Column::new("c", 2), "c".to_owned()),
1110            (Column::new("e", 4), "e".to_owned()),
1111            (Column::new("d", 3), "d".to_owned()),
1112            (Column::new("c", 2), "c".to_owned()),
1113            (Column::new("f", 5), "f".to_owned()),
1114            (Column::new("h", 7), "h".to_owned()),
1115            (Column::new("g", 6), "g".to_owned()),
1116        ];
1117        let left_table_column_count = 5;
1118        assert_eq!(
1119            join_table_borders(left_table_column_count, &projections),
1120            (4, 5)
1121        );
1122
1123        let left_table_column_count = 8;
1124        assert_eq!(
1125            join_table_borders(left_table_column_count, &projections),
1126            (7, 8)
1127        );
1128
1129        let left_table_column_count = 1;
1130        assert_eq!(
1131            join_table_borders(left_table_column_count, &projections),
1132            (-1, 0)
1133        );
1134
1135        let projections = vec![
1136            (Column::new("a", 0), "a".to_owned()),
1137            (Column::new("b", 1), "b".to_owned()),
1138            (Column::new("d", 3), "d".to_owned()),
1139            (Column::new("g", 6), "g".to_owned()),
1140            (Column::new("e", 4), "e".to_owned()),
1141            (Column::new("f", 5), "f".to_owned()),
1142            (Column::new("e", 4), "e".to_owned()),
1143            (Column::new("h", 7), "h".to_owned()),
1144        ];
1145        let left_table_column_count = 5;
1146        assert_eq!(
1147            join_table_borders(left_table_column_count, &projections),
1148            (2, 7)
1149        );
1150
1151        let left_table_column_count = 7;
1152        assert_eq!(
1153            join_table_borders(left_table_column_count, &projections),
1154            (6, 7)
1155        );
1156
1157        Ok(())
1158    }
1159
1160    #[tokio::test]
1161    async fn project_no_column() -> Result<()> {
1162        let task_ctx = Arc::new(TaskContext::default());
1163
1164        let exec = test::scan_partitioned(1);
1165        let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1166
1167        let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1168        let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1169        let output = collect(stream).await?;
1170        assert_eq!(output.len(), expected.len());
1171
1172        Ok(())
1173    }
1174
1175    #[tokio::test]
1176    async fn project_old_syntax() {
1177        let exec = test::scan_partitioned(1);
1178        let schema = exec.schema();
1179        let expr = col("i", &schema).unwrap();
1180        ProjectionExec::try_new(
1181            vec![
1182                // use From impl of ProjectionExpr to create ProjectionExpr
1183                // to test old syntax
1184                (expr, "c".to_string()),
1185            ],
1186            exec,
1187        )
1188        // expect this to succeed
1189        .unwrap();
1190    }
1191
1192    #[test]
1193    fn test_projection_statistics_uses_input_schema() {
1194        let input_schema = Schema::new(vec![
1195            Field::new("a", DataType::Int32, false),
1196            Field::new("b", DataType::Int32, false),
1197            Field::new("c", DataType::Int32, false),
1198            Field::new("d", DataType::Int32, false),
1199            Field::new("e", DataType::Int32, false),
1200            Field::new("f", DataType::Int32, false),
1201        ]);
1202
1203        let input_statistics = Statistics {
1204            num_rows: Precision::Exact(10),
1205            column_statistics: vec![
1206                ColumnStatistics {
1207                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1208                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1209                    ..Default::default()
1210                },
1211                ColumnStatistics {
1212                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1213                    max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1214                    ..Default::default()
1215                },
1216                ColumnStatistics {
1217                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1218                    max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1219                    ..Default::default()
1220                },
1221                ColumnStatistics {
1222                    min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1223                    max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1224                    ..Default::default()
1225                },
1226                ColumnStatistics {
1227                    min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1228                    max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1229                    ..Default::default()
1230                },
1231                ColumnStatistics {
1232                    min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1233                    max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1234                    ..Default::default()
1235                },
1236            ],
1237            ..Default::default()
1238        };
1239
1240        let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1241
1242        // Create projection expressions that reference columns from the input schema and the length
1243        // of output schema columns < input schema columns and hence if we use the last few columns
1244        // from the input schema in the expressions here, bounds_check would fail on them if output
1245        // schema is supplied to the partitions_statistics method.
1246        let exprs: Vec<ProjectionExpr> = vec![
1247            ProjectionExpr {
1248                expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1249                alias: "c_renamed".to_string(),
1250            },
1251            ProjectionExpr {
1252                expr: Arc::new(BinaryExpr::new(
1253                    Arc::new(Column::new("e", 4)),
1254                    Operator::Plus,
1255                    Arc::new(Column::new("f", 5)),
1256                )) as Arc<dyn PhysicalExpr>,
1257                alias: "e_plus_f".to_string(),
1258            },
1259        ];
1260
1261        let projection = ProjectionExec::try_new(exprs, input).unwrap();
1262
1263        let stats = projection.partition_statistics(None).unwrap();
1264
1265        assert_eq!(stats.num_rows, Precision::Exact(10));
1266        assert_eq!(
1267            stats.column_statistics.len(),
1268            2,
1269            "Expected 2 columns in projection statistics"
1270        );
1271        assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1272    }
1273}