Skip to main content

datafusion_optimizer/optimize_projections/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`OptimizeProjections`] identifies and eliminates unused columns
19
20mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::sync::Arc;
25
26use datafusion_common::{
27    Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err,
28    get_required_group_by_exprs_indices, internal_datafusion_err, internal_err,
29};
30use datafusion_expr::expr::Alias;
31use datafusion_expr::{
32    Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window,
33    logical_plan::LogicalPlan,
34};
35
36use crate::optimize_projections::required_indices::RequiredIndices;
37use crate::utils::NamePreserver;
38use datafusion_common::tree_node::{
39    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
40};
41
42/// Optimizer rule to prune unnecessary columns from intermediate schemas
43/// inside the [`LogicalPlan`]. This rule:
44/// - Removes unnecessary columns that do not appear at the output and/or are
45///   not used during any computation step.
46/// - Adds projections to decrease table column size before operators that
47///   benefit from a smaller memory footprint at its input.
48/// - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`].
49///
50/// `OptimizeProjections` is an optimizer rule that identifies and eliminates
51/// columns from a logical plan that are not used by downstream operations.
52/// This can improve query performance and reduce unnecessary data processing.
53///
54/// The rule analyzes the input logical plan, determines the necessary column
55/// indices, and then removes any unnecessary columns. It also removes any
56/// unnecessary projections from the plan tree.
57///
58/// ## Schema, Field Properties, and Metadata Handling
59///
60/// The `OptimizeProjections` rule preserves schema and field metadata in most optimization scenarios:
61///
62/// **Schema-level metadata preservation by plan type**:
63/// - **Window and Aggregate plans**: Schema metadata is preserved
64/// - **Projection plans**: Schema metadata is preserved per [`projection_schema`](datafusion_expr::logical_plan::projection_schema).
65/// - **Other logical plans**: Schema metadata is preserved unless [`LogicalPlan::recompute_schema`]
66///   is called on plan types that drop metadata
67///
68/// **Field-level properties and metadata**: Individual field properties are preserved when fields
69/// are retained in the optimized plan, determined by [`exprlist_to_fields`](datafusion_expr::utils::exprlist_to_fields)
70/// and [`ExprSchemable::to_field`](datafusion_expr::expr_schema::ExprSchemable::to_field).
71///
72/// **Field precedence**: When the same field appears multiple times, the optimizer
73/// maintains one occurrence and removes duplicates (refer to `RequiredIndices::compact()`),
74/// preserving the properties and metadata of that occurrence.
75#[derive(Default, Debug)]
76pub struct OptimizeProjections {}
77
78impl OptimizeProjections {
79    #[expect(missing_docs)]
80    pub fn new() -> Self {
81        Self {}
82    }
83}
84
85impl OptimizerRule for OptimizeProjections {
86    fn name(&self) -> &str {
87        "optimize_projections"
88    }
89
90    fn apply_order(&self) -> Option<ApplyOrder> {
91        None
92    }
93
94    fn supports_rewrite(&self) -> bool {
95        true
96    }
97
98    fn rewrite(
99        &self,
100        plan: LogicalPlan,
101        config: &dyn OptimizerConfig,
102    ) -> Result<Transformed<LogicalPlan>> {
103        // All output fields are necessary:
104        let indices = RequiredIndices::new_for_all_exprs(&plan);
105        optimize_projections(plan, config, indices)
106    }
107}
108
109/// Removes unnecessary columns (e.g. columns that do not appear in the output
110/// schema and/or are not used during any computation step such as expression
111/// evaluation) from the logical plan and its inputs.
112///
113/// # Parameters
114///
115/// - `plan`: A reference to the input `LogicalPlan` to optimize.
116/// - `config`: A reference to the optimizer configuration.
117/// - `indices`: A slice of column indices that represent the necessary column
118///   indices for downstream (parent) plan nodes.
119///
120/// # Returns
121///
122/// A `Result` object with the following semantics:
123///
124/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary
125///   columns.
126/// - `Ok(None)`: Signal that the given logical plan did not require any change.
127/// - `Err(error)`: An error occurred during the optimization process.
128#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
129fn optimize_projections(
130    plan: LogicalPlan,
131    config: &dyn OptimizerConfig,
132    indices: RequiredIndices,
133) -> Result<Transformed<LogicalPlan>> {
134    // Recursively rewrite any nodes that may be able to avoid computation given
135    // their parents' required indices.
136    match plan {
137        LogicalPlan::Projection(proj) => {
138            return merge_consecutive_projections(proj)?
139                .transform_data(|proj| {
140                    rewrite_projection_given_requirements(proj, config, &indices)
141                })?
142                .transform_data(|plan| optimize_subqueries(plan, config));
143        }
144        LogicalPlan::Aggregate(aggregate) => {
145            // Split parent requirements to GROUP BY and aggregate sections:
146            let n_group_exprs = aggregate.group_expr_len()?;
147            // Offset aggregate indices so that they point to valid indices at
148            // `aggregate.aggr_expr`:
149            let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
150
151            // Get absolutely necessary GROUP BY fields.
152            //
153            // When the input has no functional dependencies, we can
154            // short-circuit this analysis.
155            let new_group_bys = if aggregate
156                .input
157                .schema()
158                .functional_dependencies()
159                .is_empty()
160            {
161                aggregate.group_expr
162            } else {
163                let group_by_expr_existing = aggregate
164                    .group_expr
165                    .iter()
166                    .map(|group_by_expr| group_by_expr.schema_name().to_string())
167                    .collect::<Vec<_>>();
168
169                if let Some(simplest_groupby_indices) =
170                    get_required_group_by_exprs_indices(
171                        aggregate.input.schema(),
172                        &group_by_expr_existing,
173                    )
174                {
175                    // Some of the fields in the GROUP BY may be required by
176                    // the parent even if these fields are unnecessary in
177                    // terms of functional dependency.
178                    group_by_reqs
179                        .append(&simplest_groupby_indices)
180                        .get_at_indices(&aggregate.group_expr)
181                } else {
182                    aggregate.group_expr
183                }
184            };
185
186            // Only use the absolutely necessary aggregate expressions required
187            // by the parent:
188            let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
189
190            if new_group_bys.is_empty() && new_aggr_expr.is_empty() {
191                // Global aggregation with no aggregate functions always produces 1 row and no columns.
192                return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
193                    EmptyRelation {
194                        produce_one_row: true,
195                        schema: Arc::new(DFSchema::empty()),
196                    },
197                )));
198            }
199
200            let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
201            let schema = aggregate.input.schema();
202            let necessary_indices =
203                RequiredIndices::new().with_exprs(schema, all_exprs_iter);
204            let necessary_exprs = necessary_indices.get_required_exprs(schema);
205
206            return optimize_projections(
207                Arc::unwrap_or_clone(aggregate.input),
208                config,
209                necessary_indices,
210            )?
211            .transform_data(|aggregate_input| {
212                // Simplify the input of the aggregation by adding a projection so
213                // that its input only contains absolutely necessary columns for
214                // the aggregate expressions. Note that necessary_indices refer to
215                // fields in `aggregate.input.schema()`.
216                add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
217            })?
218            .map_data(|aggregate_input| {
219                // Create a new aggregate plan with the updated input and only the
220                // absolutely necessary fields:
221                Aggregate::try_new(
222                    Arc::new(aggregate_input),
223                    new_group_bys,
224                    new_aggr_expr,
225                )
226                .map(LogicalPlan::Aggregate)
227            })?
228            .transform_data(|plan| optimize_subqueries(plan, config));
229        }
230        LogicalPlan::Window(window) => {
231            let input_schema = Arc::clone(window.input.schema());
232            // Split parent requirements to child and window expression sections:
233            let n_input_fields = input_schema.fields().len();
234            // Offset window expression indices so that they point to valid
235            // indices at `window.window_expr`:
236            let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
237
238            // Only use window expressions that are absolutely necessary according
239            // to parent requirements:
240            let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
241
242            // Get all the required column indices at the input, either by the
243            // parent or window expression requirements.
244            let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
245
246            return optimize_projections(
247                Arc::unwrap_or_clone(window.input),
248                config,
249                required_indices.clone(),
250            )?
251            .transform_data(|window_child| {
252                if new_window_expr.is_empty() {
253                    // When no window expression is necessary, use the input directly:
254                    Ok(Transformed::no(window_child))
255                } else {
256                    // Calculate required expressions at the input of the window.
257                    // Please note that we use `input_schema`, because `required_indices`
258                    // refers to that schema
259                    let required_exprs =
260                        required_indices.get_required_exprs(&input_schema);
261                    let window_child =
262                        add_projection_on_top_if_helpful(window_child, required_exprs)?
263                            .data;
264                    Window::try_new(new_window_expr, Arc::new(window_child))
265                        .map(LogicalPlan::Window)
266                        .map(Transformed::yes)
267                }
268            })?
269            .transform_data(|plan| optimize_subqueries(plan, config));
270        }
271        LogicalPlan::TableScan(table_scan) => {
272            let TableScan {
273                table_name,
274                source,
275                projection,
276                filters,
277                fetch,
278                projected_schema: _,
279            } = table_scan;
280
281            // Get indices referred to in the original (schema with all fields)
282            // given projected indices.
283            let projection = match &projection {
284                Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
285                None => indices.into_inner(),
286            };
287            let new_scan =
288                TableScan::try_new(table_name, source, Some(projection), filters, fetch)?;
289
290            return Transformed::yes(LogicalPlan::TableScan(new_scan))
291                .transform_data(|plan| optimize_subqueries(plan, config));
292        }
293        // Other node types are handled below
294        _ => {}
295    };
296
297    // For other plan node types, calculate indices for columns they use and
298    // try to rewrite their children
299    let mut child_required_indices: Vec<RequiredIndices> = match &plan {
300        LogicalPlan::Sort(_)
301        | LogicalPlan::Filter(_)
302        | LogicalPlan::Repartition(_)
303        | LogicalPlan::Union(_)
304        | LogicalPlan::SubqueryAlias(_)
305        | LogicalPlan::Distinct(Distinct::On(_)) => {
306            // Pass index requirements from the parent as well as column indices
307            // that appear in this plan's expressions to its child. All these
308            // operators benefit from "small" inputs, so the projection_beneficial
309            // flag is `true`.
310            plan.inputs()
311                .into_iter()
312                .map(|input| {
313                    indices
314                        .clone()
315                        .with_projection_beneficial()
316                        .with_plan_exprs(&plan, input.schema())
317                })
318                .collect::<Result<_>>()?
319        }
320        LogicalPlan::Limit(_) => {
321            // Pass index requirements from the parent as well as column indices
322            // that appear in this plan's expressions to its child. These operators
323            // do not benefit from "small" inputs, so the projection_beneficial
324            // flag is `false`.
325            plan.inputs()
326                .into_iter()
327                .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
328                .collect::<Result<_>>()?
329        }
330        LogicalPlan::Copy(_)
331        | LogicalPlan::Ddl(_)
332        | LogicalPlan::Dml(_)
333        | LogicalPlan::Explain(_)
334        | LogicalPlan::Analyze(_)
335        | LogicalPlan::Subquery(_)
336        | LogicalPlan::Statement(_)
337        | LogicalPlan::Distinct(Distinct::All(_)) => {
338            // These plans require all their fields, and their children should
339            // be treated as final plans -- otherwise, we may have schema a
340            // mismatch.
341            // TODO: For some subquery variants (e.g. a subquery arising from an
342            //       EXISTS expression), we may not need to require all indices.
343            plan.inputs()
344                .into_iter()
345                .map(RequiredIndices::new_for_all_exprs)
346                .collect()
347        }
348        LogicalPlan::Extension(extension) => {
349            if let Some(necessary_children_indices) =
350                extension.node.necessary_children_exprs(indices.indices())
351            {
352                let children = extension.node.inputs();
353                assert_eq_or_internal_err!(
354                    children.len(),
355                    necessary_children_indices.len(),
356                    "Inconsistent length between children and necessary children indices. \
357                Make sure `.necessary_children_exprs` implementation of the \
358                `UserDefinedLogicalNode` is consistent with actual children length \
359                for the node."
360                );
361                children
362                    .into_iter()
363                    .zip(necessary_children_indices)
364                    .map(|(child, necessary_indices)| {
365                        RequiredIndices::new_from_indices(necessary_indices)
366                            .with_plan_exprs(&plan, child.schema())
367                    })
368                    .collect::<Result<Vec<_>>>()?
369            } else {
370                // Requirements from parent cannot be routed down to user defined logical plan safely
371                // Assume it requires all input exprs here
372                plan.inputs()
373                    .into_iter()
374                    .map(RequiredIndices::new_for_all_exprs)
375                    .collect()
376            }
377        }
378        LogicalPlan::EmptyRelation(_)
379        | LogicalPlan::Values(_)
380        | LogicalPlan::DescribeTable(_) => {
381            // These operators have no inputs, so stop the optimization process.
382            return Ok(Transformed::no(plan));
383        }
384        LogicalPlan::RecursiveQuery(recursive) => {
385            // Only allow subqueries that reference the current CTE; nested subqueries are not yet
386            // supported for projection pushdown for simplicity.
387            // TODO: be able to do projection pushdown on recursive CTEs with subqueries
388            if plan_contains_other_subqueries(
389                recursive.static_term.as_ref(),
390                &recursive.name,
391            ) || plan_contains_other_subqueries(
392                recursive.recursive_term.as_ref(),
393                &recursive.name,
394            ) {
395                return Ok(Transformed::no(plan));
396            }
397
398            plan.inputs()
399                .into_iter()
400                .map(|input| {
401                    indices
402                        .clone()
403                        .with_projection_beneficial()
404                        .with_plan_exprs(&plan, input.schema())
405                })
406                .collect::<Result<Vec<_>>>()?
407        }
408        LogicalPlan::Join(join) => {
409            let left_len = join.left.schema().fields().len();
410            let right_len = join.right.schema().fields().len();
411            let (left_req_indices, right_req_indices) =
412                split_join_requirements(left_len, right_len, indices, &join.join_type);
413            let left_indices =
414                left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
415            let right_indices =
416                right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
417            // Joins benefit from "small" input tables (lower memory usage).
418            // Therefore, each child benefits from projection:
419            vec![
420                left_indices.with_projection_beneficial(),
421                right_indices.with_projection_beneficial(),
422            ]
423        }
424        // these nodes are explicitly rewritten in the match statement above
425        LogicalPlan::Projection(_)
426        | LogicalPlan::Aggregate(_)
427        | LogicalPlan::Window(_)
428        | LogicalPlan::TableScan(_) => {
429            return internal_err!(
430                "OptimizeProjection: should have handled in the match statement above"
431            );
432        }
433        LogicalPlan::Unnest(Unnest {
434            input,
435            dependency_indices,
436            ..
437        }) => {
438            // at least provide the indices for the exec-columns as a starting point
439            let required_indices =
440                RequiredIndices::new().with_plan_exprs(&plan, input.schema())?;
441
442            // Add additional required indices from the parent
443            let mut additional_necessary_child_indices = Vec::new();
444            indices.indices().iter().for_each(|idx| {
445                if let Some(index) = dependency_indices.get(*idx) {
446                    additional_necessary_child_indices.push(*index);
447                }
448            });
449            vec![required_indices.append(&additional_necessary_child_indices)]
450        }
451    };
452
453    // Required indices are currently ordered (child0, child1, ...)
454    // but the loop pops off the last element, so we need to reverse the order
455    child_required_indices.reverse();
456    assert_eq_or_internal_err!(
457        child_required_indices.len(),
458        plan.inputs().len(),
459        "OptimizeProjection: child_required_indices length mismatch with plan inputs"
460    );
461
462    // Rewrite children of the plan
463    let transformed_plan = plan.map_children(|child| {
464        let required_indices = child_required_indices.pop().ok_or_else(|| {
465            internal_datafusion_err!(
466                "Unexpected number of required_indices in OptimizeProjections rule"
467            )
468        })?;
469
470        let projection_beneficial = required_indices.projection_beneficial();
471        let project_exprs = required_indices.get_required_exprs(child.schema());
472
473        optimize_projections(child, config, required_indices)?.transform_data(
474            |new_input| {
475                if projection_beneficial {
476                    add_projection_on_top_if_helpful(new_input, project_exprs)
477                } else {
478                    Ok(Transformed::no(new_input))
479                }
480            },
481        )
482    })?;
483
484    let transformed_plan =
485        transformed_plan.transform_data(|plan| optimize_subqueries(plan, config))?;
486
487    // If any of the children are transformed, we need to potentially update the plan's schema
488    if transformed_plan.transformed {
489        transformed_plan.map_data(|plan| plan.recompute_schema())
490    } else {
491        Ok(transformed_plan)
492    }
493}
494
495/// Optimizes uncorrelated subquery plans embedded in expressions of the given
496/// plan node (e.g., `Expr::ScalarSubquery`). `map_children` only visits direct
497/// plan inputs, so subqueries must be handled separately.
498fn optimize_subqueries(
499    plan: LogicalPlan,
500    config: &dyn OptimizerConfig,
501) -> Result<Transformed<LogicalPlan>> {
502    plan.map_uncorrelated_subqueries(|subquery_plan| {
503        let indices = RequiredIndices::new_for_all_exprs(&subquery_plan);
504        optimize_projections(subquery_plan, config, indices)
505    })
506}
507
508/// Given a projection `proj`, this function attempts to merge it with a previous
509/// projection if it exists and if merging is beneficial. Merging is considered
510/// beneficial when expressions in the current projection are non-trivial and
511/// appear more than once in its input fields. This can act as a caching mechanism
512/// for non-trivial computations.
513///
514/// ## Metadata Handling During Projection Merging
515///
516/// **Alias metadata preservation**: When merging projections, alias metadata from both
517/// the current and previous projections is carefully preserved. The presence of metadata
518/// precludes alias trimming.
519///
520/// **Schema, Fields, and metadata**: If a projection is rewritten, the schema and metadata
521/// are preserved. Individual field properties and metadata flows through expression rewriting
522/// and are preserved when fields are referenced in the merged projection.
523/// Refer to [`projection_schema`](datafusion_expr::logical_plan::projection_schema)
524/// for more details.
525///
526/// # Parameters
527///
528/// * `proj` - A reference to the `Projection` to be merged.
529///
530/// # Returns
531///
532/// A `Result` object with the following semantics:
533///
534/// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the
535///   merged projection.
536/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
537/// - `Err(error)`: An error occurred during the function call.
538fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
539    let Projection {
540        expr,
541        input,
542        schema,
543        ..
544    } = proj;
545    let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
546        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
547    };
548
549    // A fast path: if the previous projection is same as the current projection
550    // we can directly remove the current projection and return child projection.
551    if prev_projection.expr == expr {
552        return Projection::try_new_with_schema(
553            expr,
554            Arc::clone(&prev_projection.input),
555            schema,
556        )
557        .map(Transformed::yes);
558    }
559
560    // Count usages (referrals) of each projection expression in its input fields:
561    let mut column_referral_map = HashMap::<&Column, usize>::new();
562    expr.iter()
563        .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
564
565    // If an expression is non-trivial (KeepInPlace) and appears more than once, do not merge
566    // them as consecutive projections will benefit from a compute-once approach.
567    // For details, see: https://github.com/apache/datafusion/issues/8296
568    if column_referral_map.into_iter().any(|(col, usage)| {
569        usage > 1
570            && !prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
571                .placement()
572                .should_push_to_leaves()
573    }) {
574        // no change
575        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
576    }
577
578    let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
579        // We know it is a `LogicalPlan::Projection` from check above
580        unreachable!();
581    };
582
583    // Try to rewrite the expressions in the current projection using the
584    // previous projection as input:
585    let name_preserver = NamePreserver::new_for_projection();
586    let mut original_names = vec![];
587    let new_exprs = expr.map_elements(|expr| {
588        original_names.push(name_preserver.save(&expr));
589
590        // do not rewrite top level Aliases (rewriter will remove all aliases within exprs)
591        match expr {
592            Expr::Alias(Alias {
593                expr,
594                relation,
595                name,
596                metadata,
597            }) => rewrite_expr(*expr, &prev_projection).map(|result| {
598                result.update_data(|expr| {
599                    // After substitution, the inner expression may now have the
600                    // same schema_name as the alias (e.g. when an extraction
601                    // alias like `__extracted_1 AS f(x)` is resolved back to
602                    // `f(x)`). Wrapping in a redundant self-alias causes a
603                    // cosmetic `f(x) AS f(x)` due to Display vs schema_name
604                    // formatting differences. Drop the alias when it matches.
605                    if metadata.is_none() && expr.schema_name().to_string() == name {
606                        expr
607                    } else {
608                        Expr::Alias(Alias {
609                            expr: Box::new(expr),
610                            relation,
611                            name,
612                            metadata,
613                        })
614                    }
615                })
616            }),
617            e => rewrite_expr(e, &prev_projection),
618        }
619    })?;
620
621    // if the expressions could be rewritten, create a new projection with the
622    // new expressions
623    if new_exprs.transformed {
624        // Add any needed aliases back to the expressions
625        let new_exprs = new_exprs
626            .data
627            .into_iter()
628            .zip(original_names)
629            .map(|(expr, original_name)| original_name.restore(expr))
630            .collect::<Vec<_>>();
631        Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
632    } else {
633        // not rewritten, so put the projection back together
634        let input = Arc::new(LogicalPlan::Projection(prev_projection));
635        Projection::try_new_with_schema(new_exprs.data, input, schema)
636            .map(Transformed::no)
637    }
638}
639
640/// Rewrites a projection expression using the projection before it (i.e. its input)
641/// This is a subroutine to the `merge_consecutive_projections` function.
642///
643/// # Parameters
644///
645/// * `expr` - A reference to the expression to rewrite.
646/// * `input` - A reference to the input of the projection expression (itself
647///   a projection).
648///
649/// # Returns
650///
651/// A `Result` object with the following semantics:
652///
653/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
654/// - `Ok(None)`: Signals that `expr` can not be rewritten.
655/// - `Err(error)`: An error occurred during the function call.
656///
657/// # Notes
658/// This rewrite also removes any unnecessary layers of aliasing. "Unnecessary" is
659/// defined as not contributing new information, such as metadata.
660///
661/// Without trimming, we can end up with unnecessary indirections inside expressions
662/// during projection merges.
663///
664/// Consider:
665///
666/// ```text
667/// Projection(a1 + b1 as sum1)
668/// --Projection(a as a1, b as b1)
669/// ----Source(a, b)
670/// ```
671///
672/// After merge, we want to produce:
673///
674/// ```text
675/// Projection(a + b as sum1)
676/// --Source(a, b)
677/// ```
678///
679/// Without trimming, we would end up with:
680///
681/// ```text
682/// Projection((a as a1 + b as b1) as sum1)
683/// --Source(a, b)
684/// ```
685fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
686    expr.transform_up(|expr| {
687        match expr {
688            //  remove any intermediate aliases if they do not carry metadata
689            Expr::Alias(alias) => {
690                match alias
691                    .metadata
692                    .as_ref()
693                    .map(|h| h.is_empty())
694                    .unwrap_or(true)
695                {
696                    true => Ok(Transformed::yes(*alias.expr)),
697                    false => Ok(Transformed::no(Expr::Alias(alias))),
698                }
699            }
700            Expr::Column(col) => {
701                // Find index of column:
702                let idx = input.schema.index_of_column(&col)?;
703                // get the corresponding unaliased input expression
704                //
705                // For example:
706                // * the input projection is [`a + b` as c, `d + e` as f]
707                // * the current column is an expression "f"
708                //
709                // return the expression `d + e` (not `d + e` as f)
710                let input_expr = input.expr[idx].clone().unalias_nested().data;
711                Ok(Transformed::yes(input_expr))
712            }
713            // Unsupported type for consecutive projection merge analysis.
714            _ => Ok(Transformed::no(expr)),
715        }
716    })
717}
718
719/// Splits requirement indices for a join into left and right children based on
720/// the join type.
721///
722/// This function takes the length of the left child, a slice of requirement
723/// indices, and the type of join (e.g. `INNER`, `LEFT`, `RIGHT`) as arguments.
724/// Depending on the join type, it divides the requirement indices into those
725/// that apply to the left child and those that apply to the right child.
726///
727/// - For `INNER`, `LEFT`, `RIGHT`, `FULL`, `LEFTMARK`, and `RIGHTMARK` joins,
728///   the requirements are split between left and right children. The right
729///   child indices are adjusted to point to valid positions within the right
730///   child by subtracting the length of the left child.
731///
732/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all
733///   requirements are re-routed to either the left child or the right child
734///   directly, depending on the join type.
735///
736/// # Parameters
737///
738/// * `left_len` - The length of the left child.
739/// * `right_len` - The length of the right child.
740/// * `indices` - A slice of requirement indices.
741/// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`).
742///
743/// # Returns
744///
745/// A tuple containing two vectors of `usize` indices: The first vector represents
746/// the requirements for the left child, and the second vector represents the
747/// requirements for the right child. The indices are appropriately split and
748/// adjusted based on the join type.
749fn split_join_requirements(
750    left_len: usize,
751    right_len: usize,
752    indices: RequiredIndices,
753    join_type: &JoinType,
754) -> (RequiredIndices, RequiredIndices) {
755    match join_type {
756        // In these cases requirements are split between left/right children:
757        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
758            // Decrease right side indices by `left_len` so that they point to valid
759            // positions within the right child:
760            indices.split_off(left_len)
761        }
762        JoinType::LeftMark => {
763            // LeftMark output: [left_cols(0..left_len), mark]
764            // The mark column is synthetic (produced by the join itself),
765            // so discard it and route only to the left child.
766            let (left_indices, _mark) = indices.split_off(left_len);
767            (left_indices, RequiredIndices::new())
768        }
769        JoinType::RightMark => {
770            // Same as LeftMark, but for the right child.
771            let (right_indices, _mark) = indices.split_off(right_len);
772            (RequiredIndices::new(), right_indices)
773        }
774        // All requirements can be re-routed to left child directly.
775        JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
776        // All requirements can be re-routed to right side directly.
777        // No need to change index, join schema is right child schema.
778        JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
779    }
780}
781
782/// Adds a projection on top of a logical plan if doing so reduces the number
783/// of columns for the parent operator.
784///
785/// This function takes a `LogicalPlan` and a list of projection expressions.
786/// If the projection is beneficial (it reduces the number of columns in the
787/// plan) a new `LogicalPlan` with the projection is created and returned, along
788/// with a `true` flag. If the projection doesn't reduce the number of columns,
789/// the original plan is returned with a `false` flag.
790///
791/// # Parameters
792///
793/// * `plan` - The input `LogicalPlan` to potentially add a projection to.
794/// * `project_exprs` - A list of expressions for the projection.
795///
796/// # Returns
797///
798/// A `Transformed` indicating if a projection was added
799fn add_projection_on_top_if_helpful(
800    plan: LogicalPlan,
801    project_exprs: Vec<Expr>,
802) -> Result<Transformed<LogicalPlan>> {
803    // Make sure projection decreases the number of columns, otherwise it is unnecessary.
804    if project_exprs.len() >= plan.schema().fields().len() {
805        Ok(Transformed::no(plan))
806    } else {
807        Projection::try_new(project_exprs, Arc::new(plan))
808            .map(LogicalPlan::Projection)
809            .map(Transformed::yes)
810    }
811}
812
813/// Rewrite the given projection according to the fields required by its
814/// ancestors.
815///
816/// # Parameters
817///
818/// * `proj` - A reference to the original projection to rewrite.
819/// * `config` - A reference to the optimizer configuration.
820/// * `indices` - A slice of indices representing the columns required by the
821///   ancestors of the given projection.
822///
823/// # Returns
824///
825/// A `Result` object with the following semantics:
826///
827/// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection
828/// - `Ok(None)`: No rewrite necessary.
829/// - `Err(error)`: An error occurred during the function call.
830fn rewrite_projection_given_requirements(
831    proj: Projection,
832    config: &dyn OptimizerConfig,
833    indices: &RequiredIndices,
834) -> Result<Transformed<LogicalPlan>> {
835    let Projection { expr, input, .. } = proj;
836
837    let exprs_used = indices.get_at_indices(&expr);
838
839    let required_indices =
840        RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
841
842    // rewrite the children projection, and if they are changed rewrite the
843    // projection down
844    optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
845        .transform_data(|input| {
846            if is_projection_unnecessary(&input, &exprs_used)? {
847                Ok(Transformed::yes(input))
848            } else {
849                Projection::try_new(exprs_used, Arc::new(input))
850                    .map(LogicalPlan::Projection)
851                    .map(Transformed::yes)
852            }
853        })
854}
855
856/// Projection is unnecessary, when
857/// - input schema of the projection, output schema of the projection are same, and
858/// - all projection expressions are either Column or Literal
859pub fn is_projection_unnecessary(
860    input: &LogicalPlan,
861    proj_exprs: &[Expr],
862) -> Result<bool> {
863    // First check if the number of expressions is equal to the number of fields in the input schema.
864    if proj_exprs.len() != input.schema().fields().len() {
865        return Ok(false);
866    }
867    Ok(input.schema().iter().zip(proj_exprs.iter()).all(
868        |((field_relation, field_name), expr)| {
869            // Check if the expression is a column and if it matches the field name
870            if let Expr::Column(col) = expr {
871                col.relation.as_ref() == field_relation && col.name.eq(field_name.name())
872            } else {
873                false
874            }
875        },
876    ))
877}
878
879/// Returns true if the plan subtree contains any subqueries that are not the
880/// CTE reference itself. This treats any non-CTE [`LogicalPlan::SubqueryAlias`]
881/// node (including aliased relations) as a blocker, along with expression-level
882/// subqueries like scalar, EXISTS, or IN. These cases prevent projection
883/// pushdown for now because we cannot safely reason about their column usage.
884fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool {
885    if let LogicalPlan::SubqueryAlias(alias) = plan
886        && alias.alias.table() != cte_name
887        && !subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name)
888    {
889        return true;
890    }
891
892    let mut found = false;
893    plan.apply_expressions(|expr| {
894        if expr_contains_subquery(expr) {
895            found = true;
896            Ok(TreeNodeRecursion::Stop)
897        } else {
898            Ok(TreeNodeRecursion::Continue)
899        }
900    })
901    .expect("expression traversal never fails");
902    if found {
903        return true;
904    }
905
906    plan.inputs()
907        .into_iter()
908        .any(|child| plan_contains_other_subqueries(child, cte_name))
909}
910
911fn expr_contains_subquery(expr: &Expr) -> bool {
912    expr.exists(|e| match e {
913        Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true),
914        _ => Ok(false),
915    })
916    // Safe unwrap since we are doing a simple boolean check
917    .unwrap()
918}
919
920fn subquery_alias_targets_recursive_cte(plan: &LogicalPlan, cte_name: &str) -> bool {
921    match plan {
922        LogicalPlan::TableScan(scan) => scan.table_name.table() == cte_name,
923        LogicalPlan::SubqueryAlias(alias) => {
924            subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name)
925        }
926        _ => {
927            let inputs = plan.inputs();
928            if inputs.len() == 1 {
929                subquery_alias_targets_recursive_cte(inputs[0], cte_name)
930            } else {
931                false
932            }
933        }
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use std::cmp::Ordering;
940    use std::collections::HashMap;
941    use std::fmt::Formatter;
942    use std::ops::Add;
943    use std::sync::Arc;
944    use std::vec;
945
946    use crate::optimize_projections::OptimizeProjections;
947    use crate::optimizer::Optimizer;
948    use crate::test::{
949        assert_fields_eq, scan_empty, test_table_scan, test_table_scan_fields,
950        test_table_scan_with_name,
951    };
952    use crate::{OptimizerContext, OptimizerRule};
953    use arrow::datatypes::{DataType, Field, Schema};
954    use datafusion_common::{
955        Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
956    };
957    use datafusion_expr::ExprFunctionExt;
958    use datafusion_expr::{
959        BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator, Projection,
960        UserDefinedLogicalNodeCore, WindowFunctionDefinition, binary_expr,
961        build_join_schema,
962        builder::table_scan_with_filters,
963        col,
964        expr::{self, Cast},
965        lit,
966        logical_plan::{builder::LogicalPlanBuilder, table_scan},
967        not, try_cast, when,
968    };
969    use insta::assert_snapshot;
970
971    use crate::assert_optimized_plan_eq_snapshot;
972    use datafusion_functions_aggregate::count::count_udaf;
973    use datafusion_functions_aggregate::expr_fn::{count, max, min};
974    use datafusion_functions_aggregate::min_max::max_udaf;
975
976    macro_rules! assert_optimized_plan_equal {
977        (
978            $plan:expr,
979            @ $expected:literal $(,)?
980        ) => {{
981            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
982            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(OptimizeProjections::new())];
983            assert_optimized_plan_eq_snapshot!(
984                optimizer_ctx,
985                rules,
986                $plan,
987                @ $expected,
988            )
989        }};
990    }
991
992    #[derive(Debug, Hash, PartialEq, Eq)]
993    struct NoOpUserDefined {
994        exprs: Vec<Expr>,
995        schema: DFSchemaRef,
996        input: Arc<LogicalPlan>,
997    }
998
999    impl NoOpUserDefined {
1000        fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
1001            Self {
1002                exprs: vec![],
1003                schema,
1004                input,
1005            }
1006        }
1007
1008        fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
1009            self.exprs = exprs;
1010            self
1011        }
1012    }
1013
1014    // Manual implementation needed because of `schema` field. Comparison excludes this field.
1015    impl PartialOrd for NoOpUserDefined {
1016        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1017            match self.exprs.partial_cmp(&other.exprs) {
1018                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
1019                cmp => cmp,
1020            }
1021            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
1022            .filter(|cmp| *cmp != Ordering::Equal || self == other)
1023        }
1024    }
1025
1026    impl UserDefinedLogicalNodeCore for NoOpUserDefined {
1027        fn name(&self) -> &str {
1028            "NoOpUserDefined"
1029        }
1030
1031        fn inputs(&self) -> Vec<&LogicalPlan> {
1032            vec![&self.input]
1033        }
1034
1035        fn schema(&self) -> &DFSchemaRef {
1036            &self.schema
1037        }
1038
1039        fn expressions(&self) -> Vec<Expr> {
1040            self.exprs.clone()
1041        }
1042
1043        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1044            write!(f, "NoOpUserDefined")
1045        }
1046
1047        fn with_exprs_and_inputs(
1048            &self,
1049            exprs: Vec<Expr>,
1050            mut inputs: Vec<LogicalPlan>,
1051        ) -> Result<Self> {
1052            Ok(Self {
1053                exprs,
1054                input: Arc::new(inputs.swap_remove(0)),
1055                schema: Arc::clone(&self.schema),
1056            })
1057        }
1058
1059        fn necessary_children_exprs(
1060            &self,
1061            output_columns: &[usize],
1062        ) -> Option<Vec<Vec<usize>>> {
1063            // Since schema is same. Output columns requires their corresponding version in the input columns.
1064            Some(vec![output_columns.to_vec()])
1065        }
1066
1067        fn supports_limit_pushdown(&self) -> bool {
1068            false // Disallow limit push-down by default
1069        }
1070    }
1071
1072    #[derive(Debug, Hash, PartialEq, Eq)]
1073    struct UserDefinedCrossJoin {
1074        exprs: Vec<Expr>,
1075        schema: DFSchemaRef,
1076        left_child: Arc<LogicalPlan>,
1077        right_child: Arc<LogicalPlan>,
1078    }
1079
1080    impl UserDefinedCrossJoin {
1081        fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
1082            let left_schema = left_child.schema();
1083            let right_schema = right_child.schema();
1084            let schema = Arc::new(
1085                build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
1086            );
1087            Self {
1088                exprs: vec![],
1089                schema,
1090                left_child,
1091                right_child,
1092            }
1093        }
1094    }
1095
1096    // Manual implementation needed because of `schema` field. Comparison excludes this field.
1097    impl PartialOrd for UserDefinedCrossJoin {
1098        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1099            match self.exprs.partial_cmp(&other.exprs) {
1100                Some(Ordering::Equal) => {
1101                    match self.left_child.partial_cmp(&other.left_child) {
1102                        Some(Ordering::Equal) => {
1103                            self.right_child.partial_cmp(&other.right_child)
1104                        }
1105                        cmp => cmp,
1106                    }
1107                }
1108                cmp => cmp,
1109            }
1110            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
1111            .filter(|cmp| *cmp != Ordering::Equal || self == other)
1112        }
1113    }
1114
1115    impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
1116        fn name(&self) -> &str {
1117            "UserDefinedCrossJoin"
1118        }
1119
1120        fn inputs(&self) -> Vec<&LogicalPlan> {
1121            vec![&self.left_child, &self.right_child]
1122        }
1123
1124        fn schema(&self) -> &DFSchemaRef {
1125            &self.schema
1126        }
1127
1128        fn expressions(&self) -> Vec<Expr> {
1129            self.exprs.clone()
1130        }
1131
1132        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1133            write!(f, "UserDefinedCrossJoin")
1134        }
1135
1136        fn with_exprs_and_inputs(
1137            &self,
1138            exprs: Vec<Expr>,
1139            mut inputs: Vec<LogicalPlan>,
1140        ) -> Result<Self> {
1141            assert_eq!(inputs.len(), 2);
1142            Ok(Self {
1143                exprs,
1144                left_child: Arc::new(inputs.remove(0)),
1145                right_child: Arc::new(inputs.remove(0)),
1146                schema: Arc::clone(&self.schema),
1147            })
1148        }
1149
1150        fn necessary_children_exprs(
1151            &self,
1152            output_columns: &[usize],
1153        ) -> Option<Vec<Vec<usize>>> {
1154            let left_child_len = self.left_child.schema().fields().len();
1155            let mut left_reqs = vec![];
1156            let mut right_reqs = vec![];
1157            for &out_idx in output_columns {
1158                if out_idx < left_child_len {
1159                    left_reqs.push(out_idx);
1160                } else {
1161                    // Output indices further than the left_child_len
1162                    // comes from right children
1163                    right_reqs.push(out_idx - left_child_len)
1164                }
1165            }
1166            Some(vec![left_reqs, right_reqs])
1167        }
1168
1169        fn supports_limit_pushdown(&self) -> bool {
1170            false // Disallow limit push-down by default
1171        }
1172    }
1173
1174    /// A user-defined node that does NOT implement `necessary_children_exprs`,
1175    /// so the optimizer cannot determine which columns are required from its
1176    /// children and must assume all columns are needed.
1177    #[derive(Debug, Hash, PartialEq, Eq)]
1178    struct OpaqueRequirementsUserDefined {
1179        input: Arc<LogicalPlan>,
1180        schema: DFSchemaRef,
1181    }
1182
1183    // Manual implementation needed because of `schema` field. Comparison excludes this field.
1184    impl PartialOrd for OpaqueRequirementsUserDefined {
1185        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1186            self.input
1187                .partial_cmp(&other.input)
1188                .filter(|cmp| *cmp != Ordering::Equal || self == other)
1189        }
1190    }
1191
1192    impl UserDefinedLogicalNodeCore for OpaqueRequirementsUserDefined {
1193        fn name(&self) -> &str {
1194            "OpaqueRequirementsUserDefined"
1195        }
1196
1197        fn inputs(&self) -> Vec<&LogicalPlan> {
1198            vec![&self.input]
1199        }
1200
1201        fn schema(&self) -> &DFSchemaRef {
1202            &self.schema
1203        }
1204
1205        fn expressions(&self) -> Vec<Expr> {
1206            vec![]
1207        }
1208
1209        fn with_exprs_and_inputs(
1210            &self,
1211            _exprs: Vec<Expr>,
1212            mut inputs: Vec<LogicalPlan>,
1213        ) -> Result<Self> {
1214            Ok(Self {
1215                input: Arc::new(inputs.swap_remove(0)),
1216                schema: Arc::clone(&self.schema),
1217            })
1218        }
1219
1220        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1221            write!(f, "OpaqueRequirementsUserDefined")
1222        }
1223    }
1224
1225    #[test]
1226    fn merge_two_projection() -> Result<()> {
1227        let table_scan = test_table_scan()?;
1228        let plan = LogicalPlanBuilder::from(table_scan)
1229            .project(vec![col("a")])?
1230            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1231            .build()?;
1232
1233        assert_optimized_plan_equal!(
1234            plan,
1235            @r"
1236        Projection: Int32(1) + test.a
1237          TableScan: test projection=[a]
1238        "
1239        )
1240    }
1241
1242    #[test]
1243    fn merge_three_projection() -> Result<()> {
1244        let table_scan = test_table_scan()?;
1245        let plan = LogicalPlanBuilder::from(table_scan)
1246            .project(vec![col("a"), col("b")])?
1247            .project(vec![col("a")])?
1248            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1249            .build()?;
1250
1251        assert_optimized_plan_equal!(
1252            plan,
1253            @r"
1254        Projection: Int32(1) + test.a
1255          TableScan: test projection=[a]
1256        "
1257        )
1258    }
1259
1260    #[test]
1261    fn merge_alias() -> Result<()> {
1262        let table_scan = test_table_scan()?;
1263        let plan = LogicalPlanBuilder::from(table_scan)
1264            .project(vec![col("a")])?
1265            .project(vec![col("a").alias("alias")])?
1266            .build()?;
1267
1268        assert_optimized_plan_equal!(
1269            plan,
1270            @r"
1271        Projection: test.a AS alias
1272          TableScan: test projection=[a]
1273        "
1274        )
1275    }
1276
1277    #[test]
1278    fn merge_nested_alias() -> Result<()> {
1279        let table_scan = test_table_scan()?;
1280        let plan = LogicalPlanBuilder::from(table_scan)
1281            .project(vec![col("a").alias("alias1").alias("alias2")])?
1282            .project(vec![col("alias2").alias("alias")])?
1283            .build()?;
1284
1285        assert_optimized_plan_equal!(
1286            plan,
1287            @r"
1288        Projection: test.a AS alias
1289          TableScan: test projection=[a]
1290        "
1291        )
1292    }
1293
1294    #[test]
1295    fn test_nested_count() -> Result<()> {
1296        let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1297
1298        let groups: Vec<Expr> = vec![];
1299
1300        let plan = table_scan(TableReference::none(), &schema, None)
1301            .unwrap()
1302            .aggregate(groups.clone(), vec![count(lit(1))])
1303            .unwrap()
1304            .aggregate(groups, vec![count(lit(1))])
1305            .unwrap()
1306            .build()
1307            .unwrap();
1308
1309        assert_optimized_plan_equal!(
1310            plan,
1311            @r"
1312        Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1313          EmptyRelation: rows=1
1314        "
1315        )
1316    }
1317
1318    #[test]
1319    fn test_neg_push_down() -> Result<()> {
1320        let table_scan = test_table_scan()?;
1321        let plan = LogicalPlanBuilder::from(table_scan)
1322            .project(vec![-col("a")])?
1323            .build()?;
1324
1325        assert_optimized_plan_equal!(
1326            plan,
1327            @r"
1328        Projection: (- test.a)
1329          TableScan: test projection=[a]
1330        "
1331        )
1332    }
1333
1334    #[test]
1335    fn test_is_null() -> Result<()> {
1336        let table_scan = test_table_scan()?;
1337        let plan = LogicalPlanBuilder::from(table_scan)
1338            .project(vec![col("a").is_null()])?
1339            .build()?;
1340
1341        assert_optimized_plan_equal!(
1342            plan,
1343            @r"
1344        Projection: test.a IS NULL
1345          TableScan: test projection=[a]
1346        "
1347        )
1348    }
1349
1350    #[test]
1351    fn test_is_not_null() -> Result<()> {
1352        let table_scan = test_table_scan()?;
1353        let plan = LogicalPlanBuilder::from(table_scan)
1354            .project(vec![col("a").is_not_null()])?
1355            .build()?;
1356
1357        assert_optimized_plan_equal!(
1358            plan,
1359            @r"
1360        Projection: test.a IS NOT NULL
1361          TableScan: test projection=[a]
1362        "
1363        )
1364    }
1365
1366    #[test]
1367    fn test_is_true() -> Result<()> {
1368        let table_scan = test_table_scan()?;
1369        let plan = LogicalPlanBuilder::from(table_scan)
1370            .project(vec![col("a").is_true()])?
1371            .build()?;
1372
1373        assert_optimized_plan_equal!(
1374            plan,
1375            @r"
1376        Projection: test.a IS TRUE
1377          TableScan: test projection=[a]
1378        "
1379        )
1380    }
1381
1382    #[test]
1383    fn test_is_not_true() -> Result<()> {
1384        let table_scan = test_table_scan()?;
1385        let plan = LogicalPlanBuilder::from(table_scan)
1386            .project(vec![col("a").is_not_true()])?
1387            .build()?;
1388
1389        assert_optimized_plan_equal!(
1390            plan,
1391            @r"
1392        Projection: test.a IS NOT TRUE
1393          TableScan: test projection=[a]
1394        "
1395        )
1396    }
1397
1398    #[test]
1399    fn test_is_false() -> Result<()> {
1400        let table_scan = test_table_scan()?;
1401        let plan = LogicalPlanBuilder::from(table_scan)
1402            .project(vec![col("a").is_false()])?
1403            .build()?;
1404
1405        assert_optimized_plan_equal!(
1406            plan,
1407            @r"
1408        Projection: test.a IS FALSE
1409          TableScan: test projection=[a]
1410        "
1411        )
1412    }
1413
1414    #[test]
1415    fn test_is_not_false() -> Result<()> {
1416        let table_scan = test_table_scan()?;
1417        let plan = LogicalPlanBuilder::from(table_scan)
1418            .project(vec![col("a").is_not_false()])?
1419            .build()?;
1420
1421        assert_optimized_plan_equal!(
1422            plan,
1423            @r"
1424        Projection: test.a IS NOT FALSE
1425          TableScan: test projection=[a]
1426        "
1427        )
1428    }
1429
1430    #[test]
1431    fn test_is_unknown() -> Result<()> {
1432        let table_scan = test_table_scan()?;
1433        let plan = LogicalPlanBuilder::from(table_scan)
1434            .project(vec![col("a").is_unknown()])?
1435            .build()?;
1436
1437        assert_optimized_plan_equal!(
1438            plan,
1439            @r"
1440        Projection: test.a IS UNKNOWN
1441          TableScan: test projection=[a]
1442        "
1443        )
1444    }
1445
1446    #[test]
1447    fn test_is_not_unknown() -> Result<()> {
1448        let table_scan = test_table_scan()?;
1449        let plan = LogicalPlanBuilder::from(table_scan)
1450            .project(vec![col("a").is_not_unknown()])?
1451            .build()?;
1452
1453        assert_optimized_plan_equal!(
1454            plan,
1455            @r"
1456        Projection: test.a IS NOT UNKNOWN
1457          TableScan: test projection=[a]
1458        "
1459        )
1460    }
1461
1462    #[test]
1463    fn test_not() -> Result<()> {
1464        let table_scan = test_table_scan()?;
1465        let plan = LogicalPlanBuilder::from(table_scan)
1466            .project(vec![not(col("a"))])?
1467            .build()?;
1468
1469        assert_optimized_plan_equal!(
1470            plan,
1471            @r"
1472        Projection: NOT test.a
1473          TableScan: test projection=[a]
1474        "
1475        )
1476    }
1477
1478    #[test]
1479    fn test_try_cast() -> Result<()> {
1480        let table_scan = test_table_scan()?;
1481        let plan = LogicalPlanBuilder::from(table_scan)
1482            .project(vec![try_cast(col("a"), DataType::Float64)])?
1483            .build()?;
1484
1485        assert_optimized_plan_equal!(
1486            plan,
1487            @r"
1488        Projection: TRY_CAST(test.a AS Float64)
1489          TableScan: test projection=[a]
1490        "
1491        )
1492    }
1493
1494    #[test]
1495    fn test_similar_to() -> Result<()> {
1496        let table_scan = test_table_scan()?;
1497        let expr = Box::new(col("a"));
1498        let pattern = Box::new(lit("[0-9]"));
1499        let similar_to_expr =
1500            Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1501        let plan = LogicalPlanBuilder::from(table_scan)
1502            .project(vec![similar_to_expr])?
1503            .build()?;
1504
1505        assert_optimized_plan_equal!(
1506            plan,
1507            @r#"
1508        Projection: test.a SIMILAR TO Utf8("[0-9]")
1509          TableScan: test projection=[a]
1510        "#
1511        )
1512    }
1513
1514    #[test]
1515    fn test_between() -> Result<()> {
1516        let table_scan = test_table_scan()?;
1517        let plan = LogicalPlanBuilder::from(table_scan)
1518            .project(vec![col("a").between(lit(1), lit(3))])?
1519            .build()?;
1520
1521        assert_optimized_plan_equal!(
1522            plan,
1523            @r"
1524        Projection: test.a BETWEEN Int32(1) AND Int32(3)
1525          TableScan: test projection=[a]
1526        "
1527        )
1528    }
1529
1530    // Test Case expression
1531    #[test]
1532    fn test_case_merged() -> Result<()> {
1533        let table_scan = test_table_scan()?;
1534        let plan = LogicalPlanBuilder::from(table_scan)
1535            .project(vec![col("a"), lit(0).alias("d")])?
1536            .project(vec![
1537                col("a"),
1538                when(col("a").eq(lit(1)), lit(10))
1539                    .otherwise(col("d"))?
1540                    .alias("d"),
1541            ])?
1542            .build()?;
1543
1544        assert_optimized_plan_equal!(
1545            plan,
1546            @r"
1547        Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d
1548          TableScan: test projection=[a]
1549        "
1550        )
1551    }
1552
1553    // Test outer projection isn't discarded despite the same schema as inner
1554    // https://github.com/apache/datafusion/issues/8942
1555    #[test]
1556    fn test_derived_column() -> Result<()> {
1557        let table_scan = test_table_scan()?;
1558        let plan = LogicalPlanBuilder::from(table_scan)
1559            .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1560            .project(vec![
1561                col("a"),
1562                when(col("a").eq(lit(1)), lit(10))
1563                    .otherwise(col("d"))?
1564                    .alias("d"),
1565            ])?
1566            .build()?;
1567
1568        assert_optimized_plan_equal!(
1569            plan,
1570            @r"
1571        Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d
1572          Projection: test.a + Int32(1) AS a, Int32(0) AS d
1573            TableScan: test projection=[a]
1574        "
1575        )
1576    }
1577
1578    // Since only column `a` is referred at the output. Scan should only contain projection=[a].
1579    // User defined node should be able to propagate necessary expressions by its parent to its child.
1580    #[test]
1581    fn test_user_defined_logical_plan_node() -> Result<()> {
1582        let table_scan = test_table_scan()?;
1583        let custom_plan = LogicalPlan::Extension(Extension {
1584            node: Arc::new(NoOpUserDefined::new(
1585                Arc::clone(table_scan.schema()),
1586                Arc::new(table_scan.clone()),
1587            )),
1588        });
1589        let plan = LogicalPlanBuilder::from(custom_plan)
1590            .project(vec![col("a"), lit(0).alias("d")])?
1591            .build()?;
1592
1593        assert_optimized_plan_equal!(
1594            plan,
1595            @r"
1596        Projection: test.a, Int32(0) AS d
1597          NoOpUserDefined
1598            TableScan: test projection=[a]
1599        "
1600        )
1601    }
1602
1603    // Only column `a` is referred at the output. However, User defined node itself uses column `b`
1604    // during its operation. Hence, scan should contain projection=[a, b].
1605    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1606    // required expressions.
1607    #[test]
1608    fn test_user_defined_logical_plan_node2() -> Result<()> {
1609        let table_scan = test_table_scan()?;
1610        let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1611        let custom_plan = LogicalPlan::Extension(Extension {
1612            node: Arc::new(
1613                NoOpUserDefined::new(
1614                    Arc::clone(table_scan.schema()),
1615                    Arc::new(table_scan.clone()),
1616                )
1617                .with_exprs(exprs),
1618            ),
1619        });
1620        let plan = LogicalPlanBuilder::from(custom_plan)
1621            .project(vec![col("a"), lit(0).alias("d")])?
1622            .build()?;
1623
1624        assert_optimized_plan_equal!(
1625            plan,
1626            @r"
1627        Projection: test.a, Int32(0) AS d
1628          NoOpUserDefined
1629            TableScan: test projection=[a, b]
1630        "
1631        )
1632    }
1633
1634    // Only column `a` is referred at the output. However, User defined node itself uses expression `b+c`
1635    // during its operation. Hence, scan should contain projection=[a, b, c].
1636    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1637    // required expressions. Expressions doesn't have to be just column. Requirements from complex expressions
1638    // should be propagated also.
1639    #[test]
1640    fn test_user_defined_logical_plan_node3() -> Result<()> {
1641        let table_scan = test_table_scan()?;
1642        let left_expr = Expr::Column(Column::from_qualified_name("b"));
1643        let right_expr = Expr::Column(Column::from_qualified_name("c"));
1644        let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1645            Box::new(left_expr),
1646            Operator::Plus,
1647            Box::new(right_expr),
1648        ));
1649        let exprs = vec![binary_expr];
1650        let custom_plan = LogicalPlan::Extension(Extension {
1651            node: Arc::new(
1652                NoOpUserDefined::new(
1653                    Arc::clone(table_scan.schema()),
1654                    Arc::new(table_scan.clone()),
1655                )
1656                .with_exprs(exprs),
1657            ),
1658        });
1659        let plan = LogicalPlanBuilder::from(custom_plan)
1660            .project(vec![col("a"), lit(0).alias("d")])?
1661            .build()?;
1662
1663        assert_optimized_plan_equal!(
1664            plan,
1665            @r"
1666        Projection: test.a, Int32(0) AS d
1667          NoOpUserDefined
1668            TableScan: test projection=[a, b, c]
1669        "
1670        )
1671    }
1672
1673    // Columns `l.a`, `l.c`, `r.a` is referred at the output.
1674    // User defined node should be able to propagate necessary expressions by its parent, to its children.
1675    // Even if it has multiple children.
1676    // left child should have `projection=[a, c]`, and right side should have `projection=[a]`.
1677    #[test]
1678    fn test_user_defined_logical_plan_node4() -> Result<()> {
1679        let left_table = test_table_scan_with_name("l")?;
1680        let right_table = test_table_scan_with_name("r")?;
1681        let custom_plan = LogicalPlan::Extension(Extension {
1682            node: Arc::new(UserDefinedCrossJoin::new(
1683                Arc::new(left_table),
1684                Arc::new(right_table),
1685            )),
1686        });
1687        let plan = LogicalPlanBuilder::from(custom_plan)
1688            .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1689            .build()?;
1690
1691        assert_optimized_plan_equal!(
1692            plan,
1693            @r"
1694        Projection: l.a, l.c, r.a, Int32(0) AS d
1695          UserDefinedCrossJoin
1696            TableScan: l projection=[a, c]
1697            TableScan: r projection=[a]
1698        "
1699        )
1700    }
1701
1702    #[test]
1703    fn aggregate_no_group_by() -> Result<()> {
1704        let table_scan = test_table_scan()?;
1705
1706        let plan = LogicalPlanBuilder::from(table_scan)
1707            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1708            .build()?;
1709
1710        assert_optimized_plan_equal!(
1711            plan,
1712            @r"
1713        Aggregate: groupBy=[[]], aggr=[[max(test.b)]]
1714          TableScan: test projection=[b]
1715        "
1716        )
1717    }
1718
1719    #[test]
1720    fn aggregate_group_by() -> Result<()> {
1721        let table_scan = test_table_scan()?;
1722
1723        let plan = LogicalPlanBuilder::from(table_scan)
1724            .aggregate(vec![col("c")], vec![max(col("b"))])?
1725            .build()?;
1726
1727        assert_optimized_plan_equal!(
1728            plan,
1729            @r"
1730        Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]
1731          TableScan: test projection=[b, c]
1732        "
1733        )
1734    }
1735
1736    #[test]
1737    fn aggregate_group_by_with_table_alias() -> Result<()> {
1738        let table_scan = test_table_scan()?;
1739
1740        let plan = LogicalPlanBuilder::from(table_scan)
1741            .alias("a")?
1742            .aggregate(vec![col("c")], vec![max(col("b"))])?
1743            .build()?;
1744
1745        assert_optimized_plan_equal!(
1746            plan,
1747            @r"
1748        Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]
1749          SubqueryAlias: a
1750            TableScan: test projection=[b, c]
1751        "
1752        )
1753    }
1754
1755    #[test]
1756    fn aggregate_no_group_by_with_filter() -> Result<()> {
1757        let table_scan = test_table_scan()?;
1758
1759        let plan = LogicalPlanBuilder::from(table_scan)
1760            .filter(col("c").gt(lit(1)))?
1761            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1762            .build()?;
1763
1764        assert_optimized_plan_equal!(
1765            plan,
1766            @r"
1767        Aggregate: groupBy=[[]], aggr=[[max(test.b)]]
1768          Projection: test.b
1769            Filter: test.c > Int32(1)
1770              TableScan: test projection=[b, c]
1771        "
1772        )
1773    }
1774
1775    #[test]
1776    fn aggregate_with_periods() -> Result<()> {
1777        let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1778
1779        // Build a plan that looks as follows (note "tag.one" is a column named
1780        // "tag.one", not a column named "one" in a table named "tag"):
1781        //
1782        // Projection: tag.one
1783        //   Aggregate: groupBy=[], aggr=[max("tag.one") AS "tag.one"]
1784        //    TableScan
1785        let plan = table_scan(Some("m4"), &schema, None)?
1786            .aggregate(
1787                Vec::<Expr>::new(),
1788                vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1789            )?
1790            .project([col(Column::new_unqualified("tag.one"))])?
1791            .build()?;
1792
1793        assert_optimized_plan_equal!(
1794            plan,
1795            @r"
1796        Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]
1797          TableScan: m4 projection=[tag.one]
1798        "
1799        )
1800    }
1801
1802    #[test]
1803    fn redundant_project() -> Result<()> {
1804        let table_scan = test_table_scan()?;
1805
1806        let plan = LogicalPlanBuilder::from(table_scan)
1807            .project(vec![col("a"), col("b"), col("c")])?
1808            .project(vec![col("a"), col("c"), col("b")])?
1809            .build()?;
1810        assert_optimized_plan_equal!(
1811            plan,
1812            @r"
1813        Projection: test.a, test.c, test.b
1814          TableScan: test projection=[a, b, c]
1815        "
1816        )
1817    }
1818
1819    #[test]
1820    fn reorder_scan() -> Result<()> {
1821        let schema = Schema::new(test_table_scan_fields());
1822
1823        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1824        assert_optimized_plan_equal!(
1825            plan,
1826            @"TableScan: test projection=[b, a, c]"
1827        )
1828    }
1829
1830    #[test]
1831    fn reorder_scan_projection() -> Result<()> {
1832        let schema = Schema::new(test_table_scan_fields());
1833
1834        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1835            .project(vec![col("a"), col("b")])?
1836            .build()?;
1837        assert_optimized_plan_equal!(
1838            plan,
1839            @r"
1840        Projection: test.a, test.b
1841          TableScan: test projection=[b, a]
1842        "
1843        )
1844    }
1845
1846    #[test]
1847    fn reorder_projection() -> Result<()> {
1848        let table_scan = test_table_scan()?;
1849
1850        let plan = LogicalPlanBuilder::from(table_scan)
1851            .project(vec![col("c"), col("b"), col("a")])?
1852            .build()?;
1853        assert_optimized_plan_equal!(
1854            plan,
1855            @r"
1856        Projection: test.c, test.b, test.a
1857          TableScan: test projection=[a, b, c]
1858        "
1859        )
1860    }
1861
1862    #[test]
1863    fn noncontinuous_redundant_projection() -> Result<()> {
1864        let table_scan = test_table_scan()?;
1865
1866        let plan = LogicalPlanBuilder::from(table_scan)
1867            .project(vec![col("c"), col("b"), col("a")])?
1868            .filter(col("c").gt(lit(1)))?
1869            .project(vec![col("c"), col("a"), col("b")])?
1870            .filter(col("b").gt(lit(1)))?
1871            .filter(col("a").gt(lit(1)))?
1872            .project(vec![col("a"), col("c"), col("b")])?
1873            .build()?;
1874        assert_optimized_plan_equal!(
1875            plan,
1876            @r"
1877        Projection: test.a, test.c, test.b
1878          Filter: test.a > Int32(1)
1879            Filter: test.b > Int32(1)
1880              Projection: test.c, test.a, test.b
1881                Filter: test.c > Int32(1)
1882                  Projection: test.c, test.b, test.a
1883                    TableScan: test projection=[a, b, c]
1884        "
1885        )
1886    }
1887
1888    #[test]
1889    fn join_schema_trim_full_join_column_projection() -> Result<()> {
1890        let table_scan = test_table_scan()?;
1891
1892        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1893        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1894
1895        let plan = LogicalPlanBuilder::from(table_scan)
1896            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1897            .project(vec![col("a"), col("b"), col("c1")])?
1898            .build()?;
1899
1900        let optimized_plan = optimize(plan)?;
1901
1902        // make sure projections are pushed down to both table scans
1903        assert_snapshot!(
1904            optimized_plan.clone(),
1905            @r"
1906        Left Join: test.a = test2.c1
1907          TableScan: test projection=[a, b]
1908          TableScan: test2 projection=[c1]
1909        "
1910        );
1911
1912        // make sure schema for join node include both join columns
1913        let optimized_join = optimized_plan;
1914        assert_eq!(
1915            **optimized_join.schema(),
1916            DFSchema::new_with_metadata(
1917                vec![
1918                    (
1919                        Some("test".into()),
1920                        Arc::new(Field::new("a", DataType::UInt32, false))
1921                    ),
1922                    (
1923                        Some("test".into()),
1924                        Arc::new(Field::new("b", DataType::UInt32, false))
1925                    ),
1926                    (
1927                        Some("test2".into()),
1928                        Arc::new(Field::new("c1", DataType::UInt32, true))
1929                    ),
1930                ],
1931                HashMap::new()
1932            )?,
1933        );
1934
1935        Ok(())
1936    }
1937
1938    #[test]
1939    fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1940        // test join column push down without explicit column projections
1941
1942        let table_scan = test_table_scan()?;
1943
1944        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1945        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1946
1947        let plan = LogicalPlanBuilder::from(table_scan)
1948            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1949            // projecting joined column `a` should push the right side column `c1` projection as
1950            // well into test2 table even though `c1` is not referenced in projection.
1951            .project(vec![col("a"), col("b")])?
1952            .build()?;
1953
1954        let optimized_plan = optimize(plan)?;
1955
1956        // make sure projections are pushed down to both table scans
1957        assert_snapshot!(
1958            optimized_plan.clone(),
1959            @r"
1960        Projection: test.a, test.b
1961          Left Join: test.a = test2.c1
1962            TableScan: test projection=[a, b]
1963            TableScan: test2 projection=[c1]
1964        "
1965        );
1966
1967        // make sure schema for join node include both join columns
1968        let optimized_join = optimized_plan.inputs()[0];
1969        assert_eq!(
1970            **optimized_join.schema(),
1971            DFSchema::new_with_metadata(
1972                vec![
1973                    (
1974                        Some("test".into()),
1975                        Arc::new(Field::new("a", DataType::UInt32, false))
1976                    ),
1977                    (
1978                        Some("test".into()),
1979                        Arc::new(Field::new("b", DataType::UInt32, false))
1980                    ),
1981                    (
1982                        Some("test2".into()),
1983                        Arc::new(Field::new("c1", DataType::UInt32, true))
1984                    ),
1985                ],
1986                HashMap::new()
1987            )?,
1988        );
1989
1990        Ok(())
1991    }
1992
1993    #[test]
1994    fn join_schema_trim_using_join() -> Result<()> {
1995        // shared join columns from using join should be pushed to both sides
1996
1997        let table_scan = test_table_scan()?;
1998
1999        let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
2000        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
2001
2002        let plan = LogicalPlanBuilder::from(table_scan)
2003            .join_using(table2_scan, JoinType::Left, vec!["a".into()])?
2004            .project(vec![col("a"), col("b")])?
2005            .build()?;
2006
2007        let optimized_plan = optimize(plan)?;
2008
2009        // make sure projections are pushed down to table scan
2010        assert_snapshot!(
2011            optimized_plan.clone(),
2012            @r"
2013        Projection: test.a, test.b
2014          Left Join: Using test.a = test2.a
2015            TableScan: test projection=[a, b]
2016            TableScan: test2 projection=[a]
2017        "
2018        );
2019
2020        // make sure schema for join node include both join columns
2021        let optimized_join = optimized_plan.inputs()[0];
2022        assert_eq!(
2023            **optimized_join.schema(),
2024            DFSchema::new_with_metadata(
2025                vec![
2026                    (
2027                        Some("test".into()),
2028                        Arc::new(Field::new("a", DataType::UInt32, false))
2029                    ),
2030                    (
2031                        Some("test".into()),
2032                        Arc::new(Field::new("b", DataType::UInt32, false))
2033                    ),
2034                    (
2035                        Some("test2".into()),
2036                        Arc::new(Field::new("a", DataType::UInt32, true))
2037                    ),
2038                ],
2039                HashMap::new()
2040            )?,
2041        );
2042
2043        Ok(())
2044    }
2045
2046    #[test]
2047    fn cast() -> Result<()> {
2048        let table_scan = test_table_scan()?;
2049
2050        let plan = LogicalPlanBuilder::from(table_scan)
2051            .project(vec![Expr::Cast(Cast::new(
2052                Box::new(col("c")),
2053                DataType::Float64,
2054            ))])?
2055            .build()?;
2056
2057        assert_optimized_plan_equal!(
2058            plan,
2059            @r"
2060        Projection: CAST(test.c AS Float64)
2061          TableScan: test projection=[c]
2062        "
2063        )
2064    }
2065
2066    #[test]
2067    fn table_scan_projected_schema() -> Result<()> {
2068        let table_scan = test_table_scan()?;
2069        let plan = LogicalPlanBuilder::from(test_table_scan()?)
2070            .project(vec![col("a"), col("b")])?
2071            .build()?;
2072
2073        assert_eq!(3, table_scan.schema().fields().len());
2074        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2075        assert_fields_eq(&plan, vec!["a", "b"]);
2076
2077        assert_optimized_plan_equal!(
2078            plan,
2079            @"TableScan: test projection=[a, b]"
2080        )
2081    }
2082
2083    #[test]
2084    fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
2085        let table_scan = test_table_scan()?;
2086        let input_schema = table_scan.schema();
2087        assert_eq!(3, input_schema.fields().len());
2088        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2089
2090        // Build the LogicalPlan directly (don't use PlanBuilder), so
2091        // that the Column references are unqualified (e.g. their
2092        // relation is `None`). PlanBuilder resolves the expressions
2093        let expr = vec![col("test.a"), col("test.b")];
2094        let plan =
2095            LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
2096
2097        assert_fields_eq(&plan, vec!["a", "b"]);
2098
2099        assert_optimized_plan_equal!(
2100            plan,
2101            @"TableScan: test projection=[a, b]"
2102        )
2103    }
2104
2105    #[test]
2106    fn table_limit() -> Result<()> {
2107        let table_scan = test_table_scan()?;
2108        assert_eq!(3, table_scan.schema().fields().len());
2109        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2110
2111        let plan = LogicalPlanBuilder::from(table_scan)
2112            .project(vec![col("c"), col("a")])?
2113            .limit(0, Some(5))?
2114            .build()?;
2115
2116        assert_fields_eq(&plan, vec!["c", "a"]);
2117
2118        assert_optimized_plan_equal!(
2119            plan,
2120            @r"
2121        Limit: skip=0, fetch=5
2122          Projection: test.c, test.a
2123            TableScan: test projection=[a, c]
2124        "
2125        )
2126    }
2127
2128    #[test]
2129    fn table_scan_without_projection() -> Result<()> {
2130        let table_scan = test_table_scan()?;
2131        let plan = LogicalPlanBuilder::from(table_scan).build()?;
2132        // should expand projection to all columns without projection
2133        assert_optimized_plan_equal!(
2134            plan,
2135            @"TableScan: test projection=[a, b, c]"
2136        )
2137    }
2138
2139    #[test]
2140    fn table_scan_with_literal_projection() -> Result<()> {
2141        let table_scan = test_table_scan()?;
2142        let plan = LogicalPlanBuilder::from(table_scan)
2143            .project(vec![lit(1_i64), lit(2_i64)])?
2144            .build()?;
2145        assert_optimized_plan_equal!(
2146            plan,
2147            @r"
2148        Projection: Int64(1), Int64(2)
2149          TableScan: test projection=[]
2150        "
2151        )
2152    }
2153
2154    /// tests that it removes unused columns in projections
2155    #[test]
2156    fn table_unused_column() -> Result<()> {
2157        let table_scan = test_table_scan()?;
2158        assert_eq!(3, table_scan.schema().fields().len());
2159        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2160
2161        // we never use "b" in the first projection => remove it
2162        let plan = LogicalPlanBuilder::from(table_scan)
2163            .project(vec![col("c"), col("a"), col("b")])?
2164            .filter(col("c").gt(lit(1)))?
2165            .aggregate(vec![col("c")], vec![max(col("a"))])?
2166            .build()?;
2167
2168        assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
2169
2170        let plan = optimize(plan).expect("failed to optimize plan");
2171        assert_optimized_plan_equal!(
2172            plan,
2173            @r"
2174        Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]
2175          Filter: test.c > Int32(1)
2176            Projection: test.c, test.a
2177              TableScan: test projection=[a, c]
2178        "
2179        )
2180    }
2181
2182    /// tests that it removes un-needed projections
2183    #[test]
2184    fn table_unused_projection() -> Result<()> {
2185        let table_scan = test_table_scan()?;
2186        assert_eq!(3, table_scan.schema().fields().len());
2187        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2188
2189        // there is no need for the first projection
2190        let plan = LogicalPlanBuilder::from(table_scan)
2191            .project(vec![col("b")])?
2192            .project(vec![lit(1).alias("a")])?
2193            .build()?;
2194
2195        assert_fields_eq(&plan, vec!["a"]);
2196
2197        assert_optimized_plan_equal!(
2198            plan,
2199            @r"
2200        Projection: Int32(1) AS a
2201          TableScan: test projection=[]
2202        "
2203        )
2204    }
2205
2206    #[test]
2207    fn table_full_filter_pushdown() -> Result<()> {
2208        let schema = Schema::new(test_table_scan_fields());
2209
2210        let table_scan = table_scan_with_filters(
2211            Some("test"),
2212            &schema,
2213            None,
2214            vec![col("b").eq(lit(1))],
2215        )?
2216        .build()?;
2217        assert_eq!(3, table_scan.schema().fields().len());
2218        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2219
2220        // there is no need for the first projection
2221        let plan = LogicalPlanBuilder::from(table_scan)
2222            .project(vec![col("b")])?
2223            .project(vec![lit(1).alias("a")])?
2224            .build()?;
2225
2226        assert_fields_eq(&plan, vec!["a"]);
2227
2228        assert_optimized_plan_equal!(
2229            plan,
2230            @r"
2231        Projection: Int32(1) AS a
2232          TableScan: test projection=[], full_filters=[b = Int32(1)]
2233        "
2234        )
2235    }
2236
2237    /// tests that optimizing twice yields same plan
2238    #[test]
2239    fn test_double_optimization() -> Result<()> {
2240        let table_scan = test_table_scan()?;
2241
2242        let plan = LogicalPlanBuilder::from(table_scan)
2243            .project(vec![col("b")])?
2244            .project(vec![lit(1).alias("a")])?
2245            .build()?;
2246
2247        let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
2248        let optimized_plan2 =
2249            optimize(optimized_plan1.clone()).expect("failed to optimize plan");
2250
2251        let formatted_plan1 = format!("{optimized_plan1:?}");
2252        let formatted_plan2 = format!("{optimized_plan2:?}");
2253        assert_eq!(formatted_plan1, formatted_plan2);
2254        Ok(())
2255    }
2256
2257    #[test]
2258    fn test_continue_processing_through_extension() -> Result<()> {
2259        let table_scan = test_table_scan()?;
2260        let plan = LogicalPlanBuilder::from(table_scan.clone())
2261            .project(vec![col("a")])?
2262            .project(vec![col("a")])?
2263            .build()?;
2264        let plan = LogicalPlan::Extension(Extension {
2265            node: Arc::new(OpaqueRequirementsUserDefined {
2266                input: Arc::new(plan),
2267                schema: Arc::clone(table_scan.schema()),
2268            }),
2269        });
2270        let plan = optimize(plan).expect("failed to optimize plan");
2271        assert_optimized_plan_equal!(
2272            plan,
2273            @r"
2274        OpaqueRequirementsUserDefined
2275          TableScan: test projection=[a]
2276        "
2277        )
2278    }
2279
2280    /// tests that it removes an aggregate is never used downstream
2281    #[test]
2282    fn table_unused_aggregate() -> Result<()> {
2283        let table_scan = test_table_scan()?;
2284        assert_eq!(3, table_scan.schema().fields().len());
2285        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2286
2287        // we never use "min(b)" => remove it
2288        let plan = LogicalPlanBuilder::from(table_scan)
2289            .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
2290            .filter(col("c").gt(lit(1)))?
2291            .project(vec![col("c"), col("a"), col("max(test.b)")])?
2292            .build()?;
2293
2294        assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
2295
2296        assert_optimized_plan_equal!(
2297            plan,
2298            @r"
2299        Projection: test.c, test.a, max(test.b)
2300          Filter: test.c > Int32(1)
2301            Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]
2302              TableScan: test projection=[a, b, c]
2303        "
2304        )
2305    }
2306
2307    #[test]
2308    fn aggregate_filter_pushdown() -> Result<()> {
2309        let table_scan = test_table_scan()?;
2310        let aggr_with_filter = count_udaf()
2311            .call(vec![col("b")])
2312            .filter(col("c").gt(lit(42)))
2313            .build()?;
2314        let plan = LogicalPlanBuilder::from(table_scan)
2315            .aggregate(
2316                vec![col("a")],
2317                vec![count(col("b")), aggr_with_filter.alias("count2")],
2318            )?
2319            .build()?;
2320
2321        assert_optimized_plan_equal!(
2322            plan,
2323            @r"
2324        Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]
2325          TableScan: test projection=[a, b, c]
2326        "
2327        )
2328    }
2329
2330    #[test]
2331    fn pushdown_through_distinct() -> Result<()> {
2332        let table_scan = test_table_scan()?;
2333
2334        let plan = LogicalPlanBuilder::from(table_scan)
2335            .project(vec![col("a"), col("b")])?
2336            .distinct()?
2337            .project(vec![col("a")])?
2338            .build()?;
2339
2340        assert_optimized_plan_equal!(
2341            plan,
2342            @r"
2343        Projection: test.a
2344          Distinct:
2345            TableScan: test projection=[a, b]
2346        "
2347        )
2348    }
2349
2350    #[test]
2351    fn test_window() -> Result<()> {
2352        let table_scan = test_table_scan()?;
2353
2354        let max1 = Expr::from(expr::WindowFunction::new(
2355            WindowFunctionDefinition::AggregateUDF(max_udaf()),
2356            vec![col("test.a")],
2357        ))
2358        .partition_by(vec![col("test.b")])
2359        .build()
2360        .unwrap();
2361
2362        let max2 = Expr::from(expr::WindowFunction::new(
2363            WindowFunctionDefinition::AggregateUDF(max_udaf()),
2364            vec![col("test.b")],
2365        ));
2366        let col1 = col(max1.schema_name().to_string());
2367        let col2 = col(max2.schema_name().to_string());
2368
2369        let plan = LogicalPlanBuilder::from(table_scan)
2370            .window(vec![max1])?
2371            .window(vec![max2])?
2372            .project(vec![col1, col2])?
2373            .build()?;
2374
2375        assert_optimized_plan_equal!(
2376            plan,
2377            @r"
2378        Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
2379          WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
2380            Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
2381              WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
2382                TableScan: test projection=[a, b]
2383        "
2384        )
2385    }
2386
2387    // Regression test for https://github.com/apache/datafusion/issues/20083
2388    // Optimizer must not fail when LeftMark joins from EXISTS OR EXISTS
2389    // feed into a Left join.
2390    #[test]
2391    fn optimize_projections_exists_or_exists_with_outer_join() -> Result<()> {
2392        use datafusion_expr::utils::disjunction;
2393        use datafusion_expr::{exists, out_ref_col};
2394
2395        let table_a = test_table_scan_with_name("a")?;
2396        let table_b = test_table_scan_with_name("b")?;
2397
2398        let sq_a = Arc::new(
2399            LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?)
2400                .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))?
2401                .project(vec![lit(1)])?
2402                .build()?,
2403        );
2404
2405        let sq_b = Arc::new(
2406            LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?)
2407                .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))?
2408                .project(vec![lit(1)])?
2409                .build()?,
2410        );
2411
2412        let plan = LogicalPlanBuilder::from(table_a)
2413            .filter(disjunction(vec![exists(sq_a), exists(sq_b)]).unwrap())?
2414            .join(table_b, JoinType::Left, (vec!["a"], vec!["a"]), None)?
2415            .build()?;
2416
2417        let optimizer = Optimizer::new();
2418        let config = OptimizerContext::new();
2419        optimizer.optimize(plan, &config, observe)?;
2420
2421        Ok(())
2422    }
2423
2424    #[test]
2425    fn optimize_projections_left_mark_join_with_projection() -> Result<()> {
2426        let table_a = test_table_scan_with_name("a")?;
2427        let table_b = test_table_scan_with_name("b")?;
2428        let table_c = test_table_scan_with_name("c")?;
2429
2430        let plan = LogicalPlanBuilder::from(table_a)
2431            .join(table_b, JoinType::LeftMark, (vec!["a"], vec!["a"]), None)?
2432            .project(vec![col("a.a"), col("a.b"), col("a.c")])?
2433            .join(table_c, JoinType::Left, (vec!["a"], vec!["a"]), None)?
2434            .build()?;
2435
2436        assert_optimized_plan_equal!(
2437            plan,
2438            @r"
2439        Left Join: a.a = c.a
2440          Projection: a.a, a.b, a.c
2441            LeftMark Join: a.a = b.a
2442              TableScan: a projection=[a, b, c]
2443              TableScan: b projection=[a]
2444          TableScan: c projection=[a, b, c]
2445        "
2446        )
2447    }
2448
2449    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
2450
2451    fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
2452        let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
2453        let optimized_plan =
2454            optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
2455        Ok(optimized_plan)
2456    }
2457}