datafusion_optimizer/optimize_projections/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`OptimizeProjections`] identifies and eliminates unused columns
19
20mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::collections::HashSet;
25use std::sync::Arc;
26
27use datafusion_common::{
28    get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
29    HashMap, JoinType, Result,
30};
31use datafusion_expr::expr::Alias;
32use datafusion_expr::Unnest;
33use datafusion_expr::{
34    logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, Projection,
35    TableScan, Window,
36};
37
38use crate::optimize_projections::required_indices::RequiredIndices;
39use crate::utils::NamePreserver;
40use datafusion_common::tree_node::{
41    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
42};
43
44/// Optimizer rule to prune unnecessary columns from intermediate schemas
45/// inside the [`LogicalPlan`]. This rule:
46/// - Removes unnecessary columns that do not appear at the output and/or are
47///   not used during any computation step.
48/// - Adds projections to decrease table column size before operators that
49///   benefit from a smaller memory footprint at its input.
50/// - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`].
51///
52/// `OptimizeProjections` is an optimizer rule that identifies and eliminates
53/// columns from a logical plan that are not used by downstream operations.
54/// This can improve query performance and reduce unnecessary data processing.
55///
56/// The rule analyzes the input logical plan, determines the necessary column
57/// indices, and then removes any unnecessary columns. It also removes any
58/// unnecessary projections from the plan tree.
59#[derive(Default, Debug)]
60pub struct OptimizeProjections {}
61
62impl OptimizeProjections {
63    #[allow(missing_docs)]
64    pub fn new() -> Self {
65        Self {}
66    }
67}
68
69impl OptimizerRule for OptimizeProjections {
70    fn name(&self) -> &str {
71        "optimize_projections"
72    }
73
74    fn apply_order(&self) -> Option<ApplyOrder> {
75        None
76    }
77
78    fn supports_rewrite(&self) -> bool {
79        true
80    }
81
82    fn rewrite(
83        &self,
84        plan: LogicalPlan,
85        config: &dyn OptimizerConfig,
86    ) -> Result<Transformed<LogicalPlan>> {
87        // All output fields are necessary:
88        let indices = RequiredIndices::new_for_all_exprs(&plan);
89        optimize_projections(plan, config, indices)
90    }
91}
92
93/// Removes unnecessary columns (e.g. columns that do not appear in the output
94/// schema and/or are not used during any computation step such as expression
95/// evaluation) from the logical plan and its inputs.
96///
97/// # Parameters
98///
99/// - `plan`: A reference to the input `LogicalPlan` to optimize.
100/// - `config`: A reference to the optimizer configuration.
101/// - `indices`: A slice of column indices that represent the necessary column
102///   indices for downstream (parent) plan nodes.
103///
104/// # Returns
105///
106/// A `Result` object with the following semantics:
107///
108/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary
109///   columns.
110/// - `Ok(None)`: Signal that the given logical plan did not require any change.
111/// - `Err(error)`: An error occurred during the optimization process.
112#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
113fn optimize_projections(
114    plan: LogicalPlan,
115    config: &dyn OptimizerConfig,
116    indices: RequiredIndices,
117) -> Result<Transformed<LogicalPlan>> {
118    // Recursively rewrite any nodes that may be able to avoid computation given
119    // their parents' required indices.
120    match plan {
121        LogicalPlan::Projection(proj) => {
122            return merge_consecutive_projections(proj)?.transform_data(|proj| {
123                rewrite_projection_given_requirements(proj, config, &indices)
124            })
125        }
126        LogicalPlan::Aggregate(aggregate) => {
127            // Split parent requirements to GROUP BY and aggregate sections:
128            let n_group_exprs = aggregate.group_expr_len()?;
129            // Offset aggregate indices so that they point to valid indices at
130            // `aggregate.aggr_expr`:
131            let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
132
133            // Get absolutely necessary GROUP BY fields:
134            let group_by_expr_existing = aggregate
135                .group_expr
136                .iter()
137                .map(|group_by_expr| group_by_expr.schema_name().to_string())
138                .collect::<Vec<_>>();
139
140            let new_group_bys = if let Some(simplest_groupby_indices) =
141                get_required_group_by_exprs_indices(
142                    aggregate.input.schema(),
143                    &group_by_expr_existing,
144                ) {
145                // Some of the fields in the GROUP BY may be required by the
146                // parent even if these fields are unnecessary in terms of
147                // functional dependency.
148                group_by_reqs
149                    .append(&simplest_groupby_indices)
150                    .get_at_indices(&aggregate.group_expr)
151            } else {
152                aggregate.group_expr
153            };
154
155            // Only use the absolutely necessary aggregate expressions required
156            // by the parent:
157            let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
158
159            // Aggregations always need at least one aggregate expression.
160            // With a nested count, we don't require any column as input, but
161            // still need to create a correct aggregate, which may be optimized
162            // out later. As an example, consider the following query:
163            //
164            // SELECT count(*) FROM (SELECT count(*) FROM [...])
165            //
166            // which always returns 1.
167            if new_aggr_expr.is_empty()
168                && new_group_bys.is_empty()
169                && !aggregate.aggr_expr.is_empty()
170            {
171                // take the old, first aggregate expression
172                new_aggr_expr = aggregate.aggr_expr;
173                new_aggr_expr.resize_with(1, || unreachable!());
174            }
175
176            let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
177            let schema = aggregate.input.schema();
178            let necessary_indices =
179                RequiredIndices::new().with_exprs(schema, all_exprs_iter);
180            let necessary_exprs = necessary_indices.get_required_exprs(schema);
181
182            return optimize_projections(
183                Arc::unwrap_or_clone(aggregate.input),
184                config,
185                necessary_indices,
186            )?
187            .transform_data(|aggregate_input| {
188                // Simplify the input of the aggregation by adding a projection so
189                // that its input only contains absolutely necessary columns for
190                // the aggregate expressions. Note that necessary_indices refer to
191                // fields in `aggregate.input.schema()`.
192                add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
193            })?
194            .map_data(|aggregate_input| {
195                // Create a new aggregate plan with the updated input and only the
196                // absolutely necessary fields:
197                Aggregate::try_new(
198                    Arc::new(aggregate_input),
199                    new_group_bys,
200                    new_aggr_expr,
201                )
202                .map(LogicalPlan::Aggregate)
203            });
204        }
205        LogicalPlan::Window(window) => {
206            let input_schema = Arc::clone(window.input.schema());
207            // Split parent requirements to child and window expression sections:
208            let n_input_fields = input_schema.fields().len();
209            // Offset window expression indices so that they point to valid
210            // indices at `window.window_expr`:
211            let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
212
213            // Only use window expressions that are absolutely necessary according
214            // to parent requirements:
215            let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
216
217            // Get all the required column indices at the input, either by the
218            // parent or window expression requirements.
219            let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
220
221            return optimize_projections(
222                Arc::unwrap_or_clone(window.input),
223                config,
224                required_indices.clone(),
225            )?
226            .transform_data(|window_child| {
227                if new_window_expr.is_empty() {
228                    // When no window expression is necessary, use the input directly:
229                    Ok(Transformed::no(window_child))
230                } else {
231                    // Calculate required expressions at the input of the window.
232                    // Please note that we use `input_schema`, because `required_indices`
233                    // refers to that schema
234                    let required_exprs =
235                        required_indices.get_required_exprs(&input_schema);
236                    let window_child =
237                        add_projection_on_top_if_helpful(window_child, required_exprs)?
238                            .data;
239                    Window::try_new(new_window_expr, Arc::new(window_child))
240                        .map(LogicalPlan::Window)
241                        .map(Transformed::yes)
242                }
243            });
244        }
245        LogicalPlan::TableScan(table_scan) => {
246            let TableScan {
247                table_name,
248                source,
249                projection,
250                filters,
251                fetch,
252                projected_schema: _,
253            } = table_scan;
254
255            // Get indices referred to in the original (schema with all fields)
256            // given projected indices.
257            let projection = match &projection {
258                Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
259                None => indices.into_inner(),
260            };
261            return TableScan::try_new(
262                table_name,
263                source,
264                Some(projection),
265                filters,
266                fetch,
267            )
268            .map(LogicalPlan::TableScan)
269            .map(Transformed::yes);
270        }
271        // Other node types are handled below
272        _ => {}
273    };
274
275    // For other plan node types, calculate indices for columns they use and
276    // try to rewrite their children
277    let mut child_required_indices: Vec<RequiredIndices> = match &plan {
278        LogicalPlan::Sort(_)
279        | LogicalPlan::Filter(_)
280        | LogicalPlan::Repartition(_)
281        | LogicalPlan::Union(_)
282        | LogicalPlan::SubqueryAlias(_)
283        | LogicalPlan::Distinct(Distinct::On(_)) => {
284            // Pass index requirements from the parent as well as column indices
285            // that appear in this plan's expressions to its child. All these
286            // operators benefit from "small" inputs, so the projection_beneficial
287            // flag is `true`.
288            plan.inputs()
289                .into_iter()
290                .map(|input| {
291                    indices
292                        .clone()
293                        .with_projection_beneficial()
294                        .with_plan_exprs(&plan, input.schema())
295                })
296                .collect::<Result<_>>()?
297        }
298        LogicalPlan::Limit(_) => {
299            // Pass index requirements from the parent as well as column indices
300            // that appear in this plan's expressions to its child. These operators
301            // do not benefit from "small" inputs, so the projection_beneficial
302            // flag is `false`.
303            plan.inputs()
304                .into_iter()
305                .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
306                .collect::<Result<_>>()?
307        }
308        LogicalPlan::Copy(_)
309        | LogicalPlan::Ddl(_)
310        | LogicalPlan::Dml(_)
311        | LogicalPlan::Explain(_)
312        | LogicalPlan::Analyze(_)
313        | LogicalPlan::Subquery(_)
314        | LogicalPlan::Statement(_)
315        | LogicalPlan::Distinct(Distinct::All(_)) => {
316            // These plans require all their fields, and their children should
317            // be treated as final plans -- otherwise, we may have schema a
318            // mismatch.
319            // TODO: For some subquery variants (e.g. a subquery arising from an
320            //       EXISTS expression), we may not need to require all indices.
321            plan.inputs()
322                .into_iter()
323                .map(RequiredIndices::new_for_all_exprs)
324                .collect()
325        }
326        LogicalPlan::Extension(extension) => {
327            let Some(necessary_children_indices) =
328                extension.node.necessary_children_exprs(indices.indices())
329            else {
330                // Requirements from parent cannot be routed down to user defined logical plan safely
331                return Ok(Transformed::no(plan));
332            };
333            let children = extension.node.inputs();
334            if children.len() != necessary_children_indices.len() {
335                return internal_err!("Inconsistent length between children and necessary children indices. \
336                Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
337                consistent with actual children length for the node.");
338            }
339            children
340                .into_iter()
341                .zip(necessary_children_indices)
342                .map(|(child, necessary_indices)| {
343                    RequiredIndices::new_from_indices(necessary_indices)
344                        .with_plan_exprs(&plan, child.schema())
345                })
346                .collect::<Result<Vec<_>>>()?
347        }
348        LogicalPlan::EmptyRelation(_)
349        | LogicalPlan::RecursiveQuery(_)
350        | LogicalPlan::Values(_)
351        | LogicalPlan::DescribeTable(_) => {
352            // These operators have no inputs, so stop the optimization process.
353            return Ok(Transformed::no(plan));
354        }
355        LogicalPlan::Join(join) => {
356            let left_len = join.left.schema().fields().len();
357            let (left_req_indices, right_req_indices) =
358                split_join_requirements(left_len, indices, &join.join_type);
359            let left_indices =
360                left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
361            let right_indices =
362                right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
363            // Joins benefit from "small" input tables (lower memory usage).
364            // Therefore, each child benefits from projection:
365            vec![
366                left_indices.with_projection_beneficial(),
367                right_indices.with_projection_beneficial(),
368            ]
369        }
370        // these nodes are explicitly rewritten in the match statement above
371        LogicalPlan::Projection(_)
372        | LogicalPlan::Aggregate(_)
373        | LogicalPlan::Window(_)
374        | LogicalPlan::TableScan(_) => {
375            return internal_err!(
376                "OptimizeProjection: should have handled in the match statement above"
377            );
378        }
379        LogicalPlan::Unnest(Unnest {
380            dependency_indices, ..
381        }) => {
382            vec![RequiredIndices::new_from_indices(
383                dependency_indices.clone(),
384            )]
385        }
386    };
387
388    // Required indices are currently ordered (child0, child1, ...)
389    // but the loop pops off the last element, so we need to reverse the order
390    child_required_indices.reverse();
391    if child_required_indices.len() != plan.inputs().len() {
392        return internal_err!(
393            "OptimizeProjection: child_required_indices length mismatch with plan inputs"
394        );
395    }
396
397    // Rewrite children of the plan
398    let transformed_plan = plan.map_children(|child| {
399        let required_indices = child_required_indices.pop().ok_or_else(|| {
400            internal_datafusion_err!(
401                "Unexpected number of required_indices in OptimizeProjections rule"
402            )
403        })?;
404
405        let projection_beneficial = required_indices.projection_beneficial();
406        let project_exprs = required_indices.get_required_exprs(child.schema());
407
408        optimize_projections(child, config, required_indices)?.transform_data(
409            |new_input| {
410                if projection_beneficial {
411                    add_projection_on_top_if_helpful(new_input, project_exprs)
412                } else {
413                    Ok(Transformed::no(new_input))
414                }
415            },
416        )
417    })?;
418
419    // If any of the children are transformed, we need to potentially update the plan's schema
420    if transformed_plan.transformed {
421        transformed_plan.map_data(|plan| plan.recompute_schema())
422    } else {
423        Ok(transformed_plan)
424    }
425}
426
427/// Merges consecutive projections.
428///
429/// Given a projection `proj`, this function attempts to merge it with a previous
430/// projection if it exists and if merging is beneficial. Merging is considered
431/// beneficial when expressions in the current projection are non-trivial and
432/// appear more than once in its input fields. This can act as a caching mechanism
433/// for non-trivial computations.
434///
435/// # Parameters
436///
437/// * `proj` - A reference to the `Projection` to be merged.
438///
439/// # Returns
440///
441/// A `Result` object with the following semantics:
442///
443/// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the
444///   merged projection.
445/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
446/// - `Err(error)`: An error occurred during the function call.
447fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
448    let Projection {
449        expr,
450        input,
451        schema,
452        ..
453    } = proj;
454    let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
455        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
456    };
457
458    // Count usages (referrals) of each projection expression in its input fields:
459    let mut column_referral_map = HashMap::<&Column, usize>::new();
460    expr.iter()
461        .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
462
463    // If an expression is non-trivial and appears more than once, do not merge
464    // them as consecutive projections will benefit from a compute-once approach.
465    // For details, see: https://github.com/apache/datafusion/issues/8296
466    if column_referral_map.into_iter().any(|(col, usage)| {
467        usage > 1
468            && !is_expr_trivial(
469                &prev_projection.expr
470                    [prev_projection.schema.index_of_column(col).unwrap()],
471            )
472    }) {
473        // no change
474        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
475    }
476
477    let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
478        // We know it is a `LogicalPlan::Projection` from check above
479        unreachable!();
480    };
481
482    // Try to rewrite the expressions in the current projection using the
483    // previous projection as input:
484    let name_preserver = NamePreserver::new_for_projection();
485    let mut original_names = vec![];
486    let new_exprs = expr.map_elements(|expr| {
487        original_names.push(name_preserver.save(&expr));
488
489        // do not rewrite top level Aliases (rewriter will remove all aliases within exprs)
490        match expr {
491            Expr::Alias(Alias {
492                expr,
493                relation,
494                name,
495            }) => rewrite_expr(*expr, &prev_projection).map(|result| {
496                result.update_data(|expr| Expr::Alias(Alias::new(expr, relation, name)))
497            }),
498            e => rewrite_expr(e, &prev_projection),
499        }
500    })?;
501
502    // if the expressions could be rewritten, create a new projection with the
503    // new expressions
504    if new_exprs.transformed {
505        // Add any needed aliases back to the expressions
506        let new_exprs = new_exprs
507            .data
508            .into_iter()
509            .zip(original_names)
510            .map(|(expr, original_name)| original_name.restore(expr))
511            .collect::<Vec<_>>();
512        Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
513    } else {
514        // not rewritten, so put the projection back together
515        let input = Arc::new(LogicalPlan::Projection(prev_projection));
516        Projection::try_new_with_schema(new_exprs.data, input, schema)
517            .map(Transformed::no)
518    }
519}
520
521// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
522fn is_expr_trivial(expr: &Expr) -> bool {
523    matches!(expr, Expr::Column(_) | Expr::Literal(_))
524}
525
526/// Rewrites a projection expression using the projection before it (i.e. its input)
527/// This is a subroutine to the `merge_consecutive_projections` function.
528///
529/// # Parameters
530///
531/// * `expr` - A reference to the expression to rewrite.
532/// * `input` - A reference to the input of the projection expression (itself
533///   a projection).
534///
535/// # Returns
536///
537/// A `Result` object with the following semantics:
538///
539/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
540/// - `Ok(None)`: Signals that `expr` can not be rewritten.
541/// - `Err(error)`: An error occurred during the function call.
542///
543/// # Notes
544/// This rewrite also removes any unnecessary layers of aliasing.
545///
546/// Without trimming, we can end up with unnecessary indirections inside expressions
547/// during projection merges.
548///
549/// Consider:
550///
551/// ```text
552/// Projection(a1 + b1 as sum1)
553/// --Projection(a as a1, b as b1)
554/// ----Source(a, b)
555/// ```
556///
557/// After merge, we want to produce:
558///
559/// ```text
560/// Projection(a + b as sum1)
561/// --Source(a, b)
562/// ```
563///
564/// Without trimming, we would end up with:
565///
566/// ```text
567/// Projection((a as a1 + b as b1) as sum1)
568/// --Source(a, b)
569/// ```
570fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
571    expr.transform_up(|expr| {
572        match expr {
573            //  remove any intermediate aliases
574            Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
575            Expr::Column(col) => {
576                // Find index of column:
577                let idx = input.schema.index_of_column(&col)?;
578                // get the corresponding unaliased input expression
579                //
580                // For example:
581                // * the input projection is [`a + b` as c, `d + e` as f]
582                // * the current column is an expression "f"
583                //
584                // return the expression `d + e` (not `d + e` as f)
585                let input_expr = input.expr[idx].clone().unalias_nested().data;
586                Ok(Transformed::yes(input_expr))
587            }
588            // Unsupported type for consecutive projection merge analysis.
589            _ => Ok(Transformed::no(expr)),
590        }
591    })
592}
593
594/// Accumulates outer-referenced columns by the
595/// given expression, `expr`.
596///
597/// # Parameters
598///
599/// * `expr` - The expression to analyze for outer-referenced columns.
600/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
601///   columns are collected.
602fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) {
603    // inspect_expr_pre doesn't handle subquery references, so find them explicitly
604    expr.apply(|expr| {
605        match expr {
606            Expr::OuterReferenceColumn(_, col) => {
607                columns.insert(col);
608            }
609            Expr::ScalarSubquery(subquery) => {
610                outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
611            }
612            Expr::Exists(exists) => {
613                outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
614            }
615            Expr::InSubquery(insubquery) => {
616                outer_columns_helper_multi(
617                    &insubquery.subquery.outer_ref_columns,
618                    columns,
619                );
620            }
621            _ => {}
622        };
623        Ok(TreeNodeRecursion::Continue)
624    })
625    // unwrap: closure above never returns Err, so can not be Err here
626    .unwrap();
627}
628
629/// A recursive subroutine that accumulates outer-referenced columns by the
630/// given expressions (`exprs`).
631///
632/// # Parameters
633///
634/// * `exprs` - The expressions to analyze for outer-referenced columns.
635/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
636///   columns are collected.
637fn outer_columns_helper_multi<'a, 'b>(
638    exprs: impl IntoIterator<Item = &'a Expr>,
639    columns: &'b mut HashSet<&'a Column>,
640) {
641    exprs.into_iter().for_each(|e| outer_columns(e, columns));
642}
643
644/// Splits requirement indices for a join into left and right children based on
645/// the join type.
646///
647/// This function takes the length of the left child, a slice of requirement
648/// indices, and the type of join (e.g. `INNER`, `LEFT`, `RIGHT`) as arguments.
649/// Depending on the join type, it divides the requirement indices into those
650/// that apply to the left child and those that apply to the right child.
651///
652/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split
653///   between left and right children. The right child indices are adjusted to
654///   point to valid positions within the right child by subtracting the length
655///   of the left child.
656///
657/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all
658///   requirements are re-routed to either the left child or the right child
659///   directly, depending on the join type.
660///
661/// # Parameters
662///
663/// * `left_len` - The length of the left child.
664/// * `indices` - A slice of requirement indices.
665/// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`).
666///
667/// # Returns
668///
669/// A tuple containing two vectors of `usize` indices: The first vector represents
670/// the requirements for the left child, and the second vector represents the
671/// requirements for the right child. The indices are appropriately split and
672/// adjusted based on the join type.
673fn split_join_requirements(
674    left_len: usize,
675    indices: RequiredIndices,
676    join_type: &JoinType,
677) -> (RequiredIndices, RequiredIndices) {
678    match join_type {
679        // In these cases requirements are split between left/right children:
680        JoinType::Inner
681        | JoinType::Left
682        | JoinType::Right
683        | JoinType::Full
684        | JoinType::LeftMark => {
685            // Decrease right side indices by `left_len` so that they point to valid
686            // positions within the right child:
687            indices.split_off(left_len)
688        }
689        // All requirements can be re-routed to left child directly.
690        JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
691        // All requirements can be re-routed to right side directly.
692        // No need to change index, join schema is right child schema.
693        JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
694    }
695}
696
697/// Adds a projection on top of a logical plan if doing so reduces the number
698/// of columns for the parent operator.
699///
700/// This function takes a `LogicalPlan` and a list of projection expressions.
701/// If the projection is beneficial (it reduces the number of columns in the
702/// plan) a new `LogicalPlan` with the projection is created and returned, along
703/// with a `true` flag. If the projection doesn't reduce the number of columns,
704/// the original plan is returned with a `false` flag.
705///
706/// # Parameters
707///
708/// * `plan` - The input `LogicalPlan` to potentially add a projection to.
709/// * `project_exprs` - A list of expressions for the projection.
710///
711/// # Returns
712///
713/// A `Transformed` indicating if a projection was added
714fn add_projection_on_top_if_helpful(
715    plan: LogicalPlan,
716    project_exprs: Vec<Expr>,
717) -> Result<Transformed<LogicalPlan>> {
718    // Make sure projection decreases the number of columns, otherwise it is unnecessary.
719    if project_exprs.len() >= plan.schema().fields().len() {
720        Ok(Transformed::no(plan))
721    } else {
722        Projection::try_new(project_exprs, Arc::new(plan))
723            .map(LogicalPlan::Projection)
724            .map(Transformed::yes)
725    }
726}
727
728/// Rewrite the given projection according to the fields required by its
729/// ancestors.
730///
731/// # Parameters
732///
733/// * `proj` - A reference to the original projection to rewrite.
734/// * `config` - A reference to the optimizer configuration.
735/// * `indices` - A slice of indices representing the columns required by the
736///   ancestors of the given projection.
737///
738/// # Returns
739///
740/// A `Result` object with the following semantics:
741///
742/// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection
743/// - `Ok(None)`: No rewrite necessary.
744/// - `Err(error)`: An error occurred during the function call.
745fn rewrite_projection_given_requirements(
746    proj: Projection,
747    config: &dyn OptimizerConfig,
748    indices: &RequiredIndices,
749) -> Result<Transformed<LogicalPlan>> {
750    let Projection { expr, input, .. } = proj;
751
752    let exprs_used = indices.get_at_indices(&expr);
753
754    let required_indices =
755        RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
756
757    // rewrite the children projection, and if they are changed rewrite the
758    // projection down
759    optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
760        .transform_data(|input| {
761            if is_projection_unnecessary(&input, &exprs_used)? {
762                Ok(Transformed::yes(input))
763            } else {
764                Projection::try_new(exprs_used, Arc::new(input))
765                    .map(LogicalPlan::Projection)
766                    .map(Transformed::yes)
767            }
768        })
769}
770
771/// Projection is unnecessary, when
772/// - input schema of the projection, output schema of the projection are same, and
773/// - all projection expressions are either Column or Literal
774fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
775    let proj_schema = projection_schema(input, proj_exprs)?;
776    Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
777}
778
779#[cfg(test)]
780mod tests {
781    use std::cmp::Ordering;
782    use std::collections::HashMap;
783    use std::fmt::Formatter;
784    use std::ops::Add;
785    use std::sync::Arc;
786    use std::vec;
787
788    use crate::optimize_projections::OptimizeProjections;
789    use crate::optimizer::Optimizer;
790    use crate::test::{
791        assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
792        test_table_scan_fields, test_table_scan_with_name,
793    };
794    use crate::{OptimizerContext, OptimizerRule};
795    use arrow::datatypes::{DataType, Field, Schema};
796    use datafusion_common::{
797        Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
798    };
799    use datafusion_expr::ExprFunctionExt;
800    use datafusion_expr::{
801        binary_expr, build_join_schema,
802        builder::table_scan_with_filters,
803        col,
804        expr::{self, Cast},
805        lit,
806        logical_plan::{builder::LogicalPlanBuilder, table_scan},
807        not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator,
808        Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition,
809    };
810
811    use datafusion_functions_aggregate::count::count_udaf;
812    use datafusion_functions_aggregate::expr_fn::{count, max, min};
813    use datafusion_functions_aggregate::min_max::max_udaf;
814
815    fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
816        assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
817    }
818
819    #[derive(Debug, Hash, PartialEq, Eq)]
820    struct NoOpUserDefined {
821        exprs: Vec<Expr>,
822        schema: DFSchemaRef,
823        input: Arc<LogicalPlan>,
824    }
825
826    impl NoOpUserDefined {
827        fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
828            Self {
829                exprs: vec![],
830                schema,
831                input,
832            }
833        }
834
835        fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
836            self.exprs = exprs;
837            self
838        }
839    }
840
841    // Manual implementation needed because of `schema` field. Comparison excludes this field.
842    impl PartialOrd for NoOpUserDefined {
843        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
844            match self.exprs.partial_cmp(&other.exprs) {
845                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
846                cmp => cmp,
847            }
848        }
849    }
850
851    impl UserDefinedLogicalNodeCore for NoOpUserDefined {
852        fn name(&self) -> &str {
853            "NoOpUserDefined"
854        }
855
856        fn inputs(&self) -> Vec<&LogicalPlan> {
857            vec![&self.input]
858        }
859
860        fn schema(&self) -> &DFSchemaRef {
861            &self.schema
862        }
863
864        fn expressions(&self) -> Vec<Expr> {
865            self.exprs.clone()
866        }
867
868        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
869            write!(f, "NoOpUserDefined")
870        }
871
872        fn with_exprs_and_inputs(
873            &self,
874            exprs: Vec<Expr>,
875            mut inputs: Vec<LogicalPlan>,
876        ) -> Result<Self> {
877            Ok(Self {
878                exprs,
879                input: Arc::new(inputs.swap_remove(0)),
880                schema: Arc::clone(&self.schema),
881            })
882        }
883
884        fn necessary_children_exprs(
885            &self,
886            output_columns: &[usize],
887        ) -> Option<Vec<Vec<usize>>> {
888            // Since schema is same. Output columns requires their corresponding version in the input columns.
889            Some(vec![output_columns.to_vec()])
890        }
891
892        fn supports_limit_pushdown(&self) -> bool {
893            false // Disallow limit push-down by default
894        }
895    }
896
897    #[derive(Debug, Hash, PartialEq, Eq)]
898    struct UserDefinedCrossJoin {
899        exprs: Vec<Expr>,
900        schema: DFSchemaRef,
901        left_child: Arc<LogicalPlan>,
902        right_child: Arc<LogicalPlan>,
903    }
904
905    impl UserDefinedCrossJoin {
906        fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
907            let left_schema = left_child.schema();
908            let right_schema = right_child.schema();
909            let schema = Arc::new(
910                build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
911            );
912            Self {
913                exprs: vec![],
914                schema,
915                left_child,
916                right_child,
917            }
918        }
919    }
920
921    // Manual implementation needed because of `schema` field. Comparison excludes this field.
922    impl PartialOrd for UserDefinedCrossJoin {
923        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
924            match self.exprs.partial_cmp(&other.exprs) {
925                Some(Ordering::Equal) => {
926                    match self.left_child.partial_cmp(&other.left_child) {
927                        Some(Ordering::Equal) => {
928                            self.right_child.partial_cmp(&other.right_child)
929                        }
930                        cmp => cmp,
931                    }
932                }
933                cmp => cmp,
934            }
935        }
936    }
937
938    impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
939        fn name(&self) -> &str {
940            "UserDefinedCrossJoin"
941        }
942
943        fn inputs(&self) -> Vec<&LogicalPlan> {
944            vec![&self.left_child, &self.right_child]
945        }
946
947        fn schema(&self) -> &DFSchemaRef {
948            &self.schema
949        }
950
951        fn expressions(&self) -> Vec<Expr> {
952            self.exprs.clone()
953        }
954
955        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
956            write!(f, "UserDefinedCrossJoin")
957        }
958
959        fn with_exprs_and_inputs(
960            &self,
961            exprs: Vec<Expr>,
962            mut inputs: Vec<LogicalPlan>,
963        ) -> Result<Self> {
964            assert_eq!(inputs.len(), 2);
965            Ok(Self {
966                exprs,
967                left_child: Arc::new(inputs.remove(0)),
968                right_child: Arc::new(inputs.remove(0)),
969                schema: Arc::clone(&self.schema),
970            })
971        }
972
973        fn necessary_children_exprs(
974            &self,
975            output_columns: &[usize],
976        ) -> Option<Vec<Vec<usize>>> {
977            let left_child_len = self.left_child.schema().fields().len();
978            let mut left_reqs = vec![];
979            let mut right_reqs = vec![];
980            for &out_idx in output_columns {
981                if out_idx < left_child_len {
982                    left_reqs.push(out_idx);
983                } else {
984                    // Output indices further than the left_child_len
985                    // comes from right children
986                    right_reqs.push(out_idx - left_child_len)
987                }
988            }
989            Some(vec![left_reqs, right_reqs])
990        }
991
992        fn supports_limit_pushdown(&self) -> bool {
993            false // Disallow limit push-down by default
994        }
995    }
996
997    #[test]
998    fn merge_two_projection() -> Result<()> {
999        let table_scan = test_table_scan()?;
1000        let plan = LogicalPlanBuilder::from(table_scan)
1001            .project(vec![col("a")])?
1002            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1003            .build()?;
1004
1005        let expected = "Projection: Int32(1) + test.a\
1006        \n  TableScan: test projection=[a]";
1007        assert_optimized_plan_equal(plan, expected)
1008    }
1009
1010    #[test]
1011    fn merge_three_projection() -> Result<()> {
1012        let table_scan = test_table_scan()?;
1013        let plan = LogicalPlanBuilder::from(table_scan)
1014            .project(vec![col("a"), col("b")])?
1015            .project(vec![col("a")])?
1016            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1017            .build()?;
1018
1019        let expected = "Projection: Int32(1) + test.a\
1020        \n  TableScan: test projection=[a]";
1021        assert_optimized_plan_equal(plan, expected)
1022    }
1023
1024    #[test]
1025    fn merge_alias() -> Result<()> {
1026        let table_scan = test_table_scan()?;
1027        let plan = LogicalPlanBuilder::from(table_scan)
1028            .project(vec![col("a")])?
1029            .project(vec![col("a").alias("alias")])?
1030            .build()?;
1031
1032        let expected = "Projection: test.a AS alias\
1033        \n  TableScan: test projection=[a]";
1034        assert_optimized_plan_equal(plan, expected)
1035    }
1036
1037    #[test]
1038    fn merge_nested_alias() -> Result<()> {
1039        let table_scan = test_table_scan()?;
1040        let plan = LogicalPlanBuilder::from(table_scan)
1041            .project(vec![col("a").alias("alias1").alias("alias2")])?
1042            .project(vec![col("alias2").alias("alias")])?
1043            .build()?;
1044
1045        let expected = "Projection: test.a AS alias\
1046        \n  TableScan: test projection=[a]";
1047        assert_optimized_plan_equal(plan, expected)
1048    }
1049
1050    #[test]
1051    fn test_nested_count() -> Result<()> {
1052        let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1053
1054        let groups: Vec<Expr> = vec![];
1055
1056        let plan = table_scan(TableReference::none(), &schema, None)
1057            .unwrap()
1058            .aggregate(groups.clone(), vec![count(lit(1))])
1059            .unwrap()
1060            .aggregate(groups, vec![count(lit(1))])
1061            .unwrap()
1062            .build()
1063            .unwrap();
1064
1065        let expected = "Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1066        \n  Projection: \
1067        \n    Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1068        \n      TableScan: ?table? projection=[]";
1069        assert_optimized_plan_equal(plan, expected)
1070    }
1071
1072    #[test]
1073    fn test_neg_push_down() -> Result<()> {
1074        let table_scan = test_table_scan()?;
1075        let plan = LogicalPlanBuilder::from(table_scan)
1076            .project(vec![-col("a")])?
1077            .build()?;
1078
1079        let expected = "Projection: (- test.a)\
1080        \n  TableScan: test projection=[a]";
1081        assert_optimized_plan_equal(plan, expected)
1082    }
1083
1084    #[test]
1085    fn test_is_null() -> Result<()> {
1086        let table_scan = test_table_scan()?;
1087        let plan = LogicalPlanBuilder::from(table_scan)
1088            .project(vec![col("a").is_null()])?
1089            .build()?;
1090
1091        let expected = "Projection: test.a IS NULL\
1092        \n  TableScan: test projection=[a]";
1093        assert_optimized_plan_equal(plan, expected)
1094    }
1095
1096    #[test]
1097    fn test_is_not_null() -> Result<()> {
1098        let table_scan = test_table_scan()?;
1099        let plan = LogicalPlanBuilder::from(table_scan)
1100            .project(vec![col("a").is_not_null()])?
1101            .build()?;
1102
1103        let expected = "Projection: test.a IS NOT NULL\
1104        \n  TableScan: test projection=[a]";
1105        assert_optimized_plan_equal(plan, expected)
1106    }
1107
1108    #[test]
1109    fn test_is_true() -> Result<()> {
1110        let table_scan = test_table_scan()?;
1111        let plan = LogicalPlanBuilder::from(table_scan)
1112            .project(vec![col("a").is_true()])?
1113            .build()?;
1114
1115        let expected = "Projection: test.a IS TRUE\
1116        \n  TableScan: test projection=[a]";
1117        assert_optimized_plan_equal(plan, expected)
1118    }
1119
1120    #[test]
1121    fn test_is_not_true() -> Result<()> {
1122        let table_scan = test_table_scan()?;
1123        let plan = LogicalPlanBuilder::from(table_scan)
1124            .project(vec![col("a").is_not_true()])?
1125            .build()?;
1126
1127        let expected = "Projection: test.a IS NOT TRUE\
1128        \n  TableScan: test projection=[a]";
1129        assert_optimized_plan_equal(plan, expected)
1130    }
1131
1132    #[test]
1133    fn test_is_false() -> Result<()> {
1134        let table_scan = test_table_scan()?;
1135        let plan = LogicalPlanBuilder::from(table_scan)
1136            .project(vec![col("a").is_false()])?
1137            .build()?;
1138
1139        let expected = "Projection: test.a IS FALSE\
1140        \n  TableScan: test projection=[a]";
1141        assert_optimized_plan_equal(plan, expected)
1142    }
1143
1144    #[test]
1145    fn test_is_not_false() -> Result<()> {
1146        let table_scan = test_table_scan()?;
1147        let plan = LogicalPlanBuilder::from(table_scan)
1148            .project(vec![col("a").is_not_false()])?
1149            .build()?;
1150
1151        let expected = "Projection: test.a IS NOT FALSE\
1152        \n  TableScan: test projection=[a]";
1153        assert_optimized_plan_equal(plan, expected)
1154    }
1155
1156    #[test]
1157    fn test_is_unknown() -> Result<()> {
1158        let table_scan = test_table_scan()?;
1159        let plan = LogicalPlanBuilder::from(table_scan)
1160            .project(vec![col("a").is_unknown()])?
1161            .build()?;
1162
1163        let expected = "Projection: test.a IS UNKNOWN\
1164        \n  TableScan: test projection=[a]";
1165        assert_optimized_plan_equal(plan, expected)
1166    }
1167
1168    #[test]
1169    fn test_is_not_unknown() -> Result<()> {
1170        let table_scan = test_table_scan()?;
1171        let plan = LogicalPlanBuilder::from(table_scan)
1172            .project(vec![col("a").is_not_unknown()])?
1173            .build()?;
1174
1175        let expected = "Projection: test.a IS NOT UNKNOWN\
1176        \n  TableScan: test projection=[a]";
1177        assert_optimized_plan_equal(plan, expected)
1178    }
1179
1180    #[test]
1181    fn test_not() -> Result<()> {
1182        let table_scan = test_table_scan()?;
1183        let plan = LogicalPlanBuilder::from(table_scan)
1184            .project(vec![not(col("a"))])?
1185            .build()?;
1186
1187        let expected = "Projection: NOT test.a\
1188        \n  TableScan: test projection=[a]";
1189        assert_optimized_plan_equal(plan, expected)
1190    }
1191
1192    #[test]
1193    fn test_try_cast() -> Result<()> {
1194        let table_scan = test_table_scan()?;
1195        let plan = LogicalPlanBuilder::from(table_scan)
1196            .project(vec![try_cast(col("a"), DataType::Float64)])?
1197            .build()?;
1198
1199        let expected = "Projection: TRY_CAST(test.a AS Float64)\
1200        \n  TableScan: test projection=[a]";
1201        assert_optimized_plan_equal(plan, expected)
1202    }
1203
1204    #[test]
1205    fn test_similar_to() -> Result<()> {
1206        let table_scan = test_table_scan()?;
1207        let expr = Box::new(col("a"));
1208        let pattern = Box::new(lit("[0-9]"));
1209        let similar_to_expr =
1210            Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1211        let plan = LogicalPlanBuilder::from(table_scan)
1212            .project(vec![similar_to_expr])?
1213            .build()?;
1214
1215        let expected = "Projection: test.a SIMILAR TO Utf8(\"[0-9]\")\
1216        \n  TableScan: test projection=[a]";
1217        assert_optimized_plan_equal(plan, expected)
1218    }
1219
1220    #[test]
1221    fn test_between() -> Result<()> {
1222        let table_scan = test_table_scan()?;
1223        let plan = LogicalPlanBuilder::from(table_scan)
1224            .project(vec![col("a").between(lit(1), lit(3))])?
1225            .build()?;
1226
1227        let expected = "Projection: test.a BETWEEN Int32(1) AND Int32(3)\
1228        \n  TableScan: test projection=[a]";
1229        assert_optimized_plan_equal(plan, expected)
1230    }
1231
1232    // Test Case expression
1233    #[test]
1234    fn test_case_merged() -> Result<()> {
1235        let table_scan = test_table_scan()?;
1236        let plan = LogicalPlanBuilder::from(table_scan)
1237            .project(vec![col("a"), lit(0).alias("d")])?
1238            .project(vec![
1239                col("a"),
1240                when(col("a").eq(lit(1)), lit(10))
1241                    .otherwise(col("d"))?
1242                    .alias("d"),
1243            ])?
1244            .build()?;
1245
1246        let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d\
1247        \n  TableScan: test projection=[a]";
1248        assert_optimized_plan_equal(plan, expected)
1249    }
1250
1251    // Test outer projection isn't discarded despite the same schema as inner
1252    // https://github.com/apache/datafusion/issues/8942
1253    #[test]
1254    fn test_derived_column() -> Result<()> {
1255        let table_scan = test_table_scan()?;
1256        let plan = LogicalPlanBuilder::from(table_scan)
1257            .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1258            .project(vec![
1259                col("a"),
1260                when(col("a").eq(lit(1)), lit(10))
1261                    .otherwise(col("d"))?
1262                    .alias("d"),
1263            ])?
1264            .build()?;
1265
1266        let expected =
1267            "Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d\
1268        \n  Projection: test.a + Int32(1) AS a, Int32(0) AS d\
1269        \n    TableScan: test projection=[a]";
1270        assert_optimized_plan_equal(plan, expected)
1271    }
1272
1273    // Since only column `a` is referred at the output. Scan should only contain projection=[a].
1274    // User defined node should be able to propagate necessary expressions by its parent to its child.
1275    #[test]
1276    fn test_user_defined_logical_plan_node() -> Result<()> {
1277        let table_scan = test_table_scan()?;
1278        let custom_plan = LogicalPlan::Extension(Extension {
1279            node: Arc::new(NoOpUserDefined::new(
1280                Arc::clone(table_scan.schema()),
1281                Arc::new(table_scan.clone()),
1282            )),
1283        });
1284        let plan = LogicalPlanBuilder::from(custom_plan)
1285            .project(vec![col("a"), lit(0).alias("d")])?
1286            .build()?;
1287
1288        let expected = "Projection: test.a, Int32(0) AS d\
1289        \n  NoOpUserDefined\
1290        \n    TableScan: test projection=[a]";
1291        assert_optimized_plan_equal(plan, expected)
1292    }
1293
1294    // Only column `a` is referred at the output. However, User defined node itself uses column `b`
1295    // during its operation. Hence, scan should contain projection=[a, b].
1296    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1297    // required expressions.
1298    #[test]
1299    fn test_user_defined_logical_plan_node2() -> Result<()> {
1300        let table_scan = test_table_scan()?;
1301        let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1302        let custom_plan = LogicalPlan::Extension(Extension {
1303            node: Arc::new(
1304                NoOpUserDefined::new(
1305                    Arc::clone(table_scan.schema()),
1306                    Arc::new(table_scan.clone()),
1307                )
1308                .with_exprs(exprs),
1309            ),
1310        });
1311        let plan = LogicalPlanBuilder::from(custom_plan)
1312            .project(vec![col("a"), lit(0).alias("d")])?
1313            .build()?;
1314
1315        let expected = "Projection: test.a, Int32(0) AS d\
1316        \n  NoOpUserDefined\
1317        \n    TableScan: test projection=[a, b]";
1318        assert_optimized_plan_equal(plan, expected)
1319    }
1320
1321    // Only column `a` is referred at the output. However, User defined node itself uses expression `b+c`
1322    // during its operation. Hence, scan should contain projection=[a, b, c].
1323    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1324    // required expressions. Expressions doesn't have to be just column. Requirements from complex expressions
1325    // should be propagated also.
1326    #[test]
1327    fn test_user_defined_logical_plan_node3() -> Result<()> {
1328        let table_scan = test_table_scan()?;
1329        let left_expr = Expr::Column(Column::from_qualified_name("b"));
1330        let right_expr = Expr::Column(Column::from_qualified_name("c"));
1331        let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1332            Box::new(left_expr),
1333            Operator::Plus,
1334            Box::new(right_expr),
1335        ));
1336        let exprs = vec![binary_expr];
1337        let custom_plan = LogicalPlan::Extension(Extension {
1338            node: Arc::new(
1339                NoOpUserDefined::new(
1340                    Arc::clone(table_scan.schema()),
1341                    Arc::new(table_scan.clone()),
1342                )
1343                .with_exprs(exprs),
1344            ),
1345        });
1346        let plan = LogicalPlanBuilder::from(custom_plan)
1347            .project(vec![col("a"), lit(0).alias("d")])?
1348            .build()?;
1349
1350        let expected = "Projection: test.a, Int32(0) AS d\
1351        \n  NoOpUserDefined\
1352        \n    TableScan: test projection=[a, b, c]";
1353        assert_optimized_plan_equal(plan, expected)
1354    }
1355
1356    // Columns `l.a`, `l.c`, `r.a` is referred at the output.
1357    // User defined node should be able to propagate necessary expressions by its parent, to its children.
1358    // Even if it has multiple children.
1359    // left child should have `projection=[a, c]`, and right side should have `projection=[a]`.
1360    #[test]
1361    fn test_user_defined_logical_plan_node4() -> Result<()> {
1362        let left_table = test_table_scan_with_name("l")?;
1363        let right_table = test_table_scan_with_name("r")?;
1364        let custom_plan = LogicalPlan::Extension(Extension {
1365            node: Arc::new(UserDefinedCrossJoin::new(
1366                Arc::new(left_table),
1367                Arc::new(right_table),
1368            )),
1369        });
1370        let plan = LogicalPlanBuilder::from(custom_plan)
1371            .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1372            .build()?;
1373
1374        let expected = "Projection: l.a, l.c, r.a, Int32(0) AS d\
1375        \n  UserDefinedCrossJoin\
1376        \n    TableScan: l projection=[a, c]\
1377        \n    TableScan: r projection=[a]";
1378        assert_optimized_plan_equal(plan, expected)
1379    }
1380
1381    #[test]
1382    fn aggregate_no_group_by() -> Result<()> {
1383        let table_scan = test_table_scan()?;
1384
1385        let plan = LogicalPlanBuilder::from(table_scan)
1386            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1387            .build()?;
1388
1389        let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1390        \n  TableScan: test projection=[b]";
1391
1392        assert_optimized_plan_equal(plan, expected)
1393    }
1394
1395    #[test]
1396    fn aggregate_group_by() -> Result<()> {
1397        let table_scan = test_table_scan()?;
1398
1399        let plan = LogicalPlanBuilder::from(table_scan)
1400            .aggregate(vec![col("c")], vec![max(col("b"))])?
1401            .build()?;
1402
1403        let expected = "Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]\
1404        \n  TableScan: test projection=[b, c]";
1405
1406        assert_optimized_plan_equal(plan, expected)
1407    }
1408
1409    #[test]
1410    fn aggregate_group_by_with_table_alias() -> Result<()> {
1411        let table_scan = test_table_scan()?;
1412
1413        let plan = LogicalPlanBuilder::from(table_scan)
1414            .alias("a")?
1415            .aggregate(vec![col("c")], vec![max(col("b"))])?
1416            .build()?;
1417
1418        let expected = "Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]\
1419        \n  SubqueryAlias: a\
1420        \n    TableScan: test projection=[b, c]";
1421
1422        assert_optimized_plan_equal(plan, expected)
1423    }
1424
1425    #[test]
1426    fn aggregate_no_group_by_with_filter() -> Result<()> {
1427        let table_scan = test_table_scan()?;
1428
1429        let plan = LogicalPlanBuilder::from(table_scan)
1430            .filter(col("c").gt(lit(1)))?
1431            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1432            .build()?;
1433
1434        let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1435        \n  Projection: test.b\
1436        \n    Filter: test.c > Int32(1)\
1437        \n      TableScan: test projection=[b, c]";
1438
1439        assert_optimized_plan_equal(plan, expected)
1440    }
1441
1442    #[test]
1443    fn aggregate_with_periods() -> Result<()> {
1444        let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1445
1446        // Build a plan that looks as follows (note "tag.one" is a column named
1447        // "tag.one", not a column named "one" in a table named "tag"):
1448        //
1449        // Projection: tag.one
1450        //   Aggregate: groupBy=[], aggr=[max("tag.one") AS "tag.one"]
1451        //    TableScan
1452        let plan = table_scan(Some("m4"), &schema, None)?
1453            .aggregate(
1454                Vec::<Expr>::new(),
1455                vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1456            )?
1457            .project([col(Column::new_unqualified("tag.one"))])?
1458            .build()?;
1459
1460        let expected = "\
1461        Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]\
1462        \n  TableScan: m4 projection=[tag.one]";
1463
1464        assert_optimized_plan_equal(plan, expected)
1465    }
1466
1467    #[test]
1468    fn redundant_project() -> Result<()> {
1469        let table_scan = test_table_scan()?;
1470
1471        let plan = LogicalPlanBuilder::from(table_scan)
1472            .project(vec![col("a"), col("b"), col("c")])?
1473            .project(vec![col("a"), col("c"), col("b")])?
1474            .build()?;
1475        let expected = "Projection: test.a, test.c, test.b\
1476        \n  TableScan: test projection=[a, b, c]";
1477
1478        assert_optimized_plan_equal(plan, expected)
1479    }
1480
1481    #[test]
1482    fn reorder_scan() -> Result<()> {
1483        let schema = Schema::new(test_table_scan_fields());
1484
1485        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1486        let expected = "TableScan: test projection=[b, a, c]";
1487
1488        assert_optimized_plan_equal(plan, expected)
1489    }
1490
1491    #[test]
1492    fn reorder_scan_projection() -> Result<()> {
1493        let schema = Schema::new(test_table_scan_fields());
1494
1495        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1496            .project(vec![col("a"), col("b")])?
1497            .build()?;
1498        let expected = "Projection: test.a, test.b\
1499        \n  TableScan: test projection=[b, a]";
1500
1501        assert_optimized_plan_equal(plan, expected)
1502    }
1503
1504    #[test]
1505    fn reorder_projection() -> Result<()> {
1506        let table_scan = test_table_scan()?;
1507
1508        let plan = LogicalPlanBuilder::from(table_scan)
1509            .project(vec![col("c"), col("b"), col("a")])?
1510            .build()?;
1511        let expected = "Projection: test.c, test.b, test.a\
1512        \n  TableScan: test projection=[a, b, c]";
1513
1514        assert_optimized_plan_equal(plan, expected)
1515    }
1516
1517    #[test]
1518    fn noncontinuous_redundant_projection() -> Result<()> {
1519        let table_scan = test_table_scan()?;
1520
1521        let plan = LogicalPlanBuilder::from(table_scan)
1522            .project(vec![col("c"), col("b"), col("a")])?
1523            .filter(col("c").gt(lit(1)))?
1524            .project(vec![col("c"), col("a"), col("b")])?
1525            .filter(col("b").gt(lit(1)))?
1526            .filter(col("a").gt(lit(1)))?
1527            .project(vec![col("a"), col("c"), col("b")])?
1528            .build()?;
1529        let expected = "Projection: test.a, test.c, test.b\
1530        \n  Filter: test.a > Int32(1)\
1531        \n    Filter: test.b > Int32(1)\
1532        \n      Projection: test.c, test.a, test.b\
1533        \n        Filter: test.c > Int32(1)\
1534        \n          Projection: test.c, test.b, test.a\
1535        \n            TableScan: test projection=[a, b, c]";
1536        assert_optimized_plan_equal(plan, expected)
1537    }
1538
1539    #[test]
1540    fn join_schema_trim_full_join_column_projection() -> Result<()> {
1541        let table_scan = test_table_scan()?;
1542
1543        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1544        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1545
1546        let plan = LogicalPlanBuilder::from(table_scan)
1547            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1548            .project(vec![col("a"), col("b"), col("c1")])?
1549            .build()?;
1550
1551        // make sure projections are pushed down to both table scans
1552        let expected = "Left Join: test.a = test2.c1\
1553        \n  TableScan: test projection=[a, b]\
1554        \n  TableScan: test2 projection=[c1]";
1555
1556        let optimized_plan = optimize(plan)?;
1557        let formatted_plan = format!("{optimized_plan}");
1558        assert_eq!(formatted_plan, expected);
1559
1560        // make sure schema for join node include both join columns
1561        let optimized_join = optimized_plan;
1562        assert_eq!(
1563            **optimized_join.schema(),
1564            DFSchema::new_with_metadata(
1565                vec![
1566                    (
1567                        Some("test".into()),
1568                        Arc::new(Field::new("a", DataType::UInt32, false))
1569                    ),
1570                    (
1571                        Some("test".into()),
1572                        Arc::new(Field::new("b", DataType::UInt32, false))
1573                    ),
1574                    (
1575                        Some("test2".into()),
1576                        Arc::new(Field::new("c1", DataType::UInt32, true))
1577                    ),
1578                ],
1579                HashMap::new()
1580            )?,
1581        );
1582
1583        Ok(())
1584    }
1585
1586    #[test]
1587    fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1588        // test join column push down without explicit column projections
1589
1590        let table_scan = test_table_scan()?;
1591
1592        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1593        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1594
1595        let plan = LogicalPlanBuilder::from(table_scan)
1596            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1597            // projecting joined column `a` should push the right side column `c1` projection as
1598            // well into test2 table even though `c1` is not referenced in projection.
1599            .project(vec![col("a"), col("b")])?
1600            .build()?;
1601
1602        // make sure projections are pushed down to both table scans
1603        let expected = "Projection: test.a, test.b\
1604        \n  Left Join: test.a = test2.c1\
1605        \n    TableScan: test projection=[a, b]\
1606        \n    TableScan: test2 projection=[c1]";
1607
1608        let optimized_plan = optimize(plan)?;
1609        let formatted_plan = format!("{optimized_plan}");
1610        assert_eq!(formatted_plan, expected);
1611
1612        // make sure schema for join node include both join columns
1613        let optimized_join = optimized_plan.inputs()[0];
1614        assert_eq!(
1615            **optimized_join.schema(),
1616            DFSchema::new_with_metadata(
1617                vec![
1618                    (
1619                        Some("test".into()),
1620                        Arc::new(Field::new("a", DataType::UInt32, false))
1621                    ),
1622                    (
1623                        Some("test".into()),
1624                        Arc::new(Field::new("b", DataType::UInt32, false))
1625                    ),
1626                    (
1627                        Some("test2".into()),
1628                        Arc::new(Field::new("c1", DataType::UInt32, true))
1629                    ),
1630                ],
1631                HashMap::new()
1632            )?,
1633        );
1634
1635        Ok(())
1636    }
1637
1638    #[test]
1639    fn join_schema_trim_using_join() -> Result<()> {
1640        // shared join columns from using join should be pushed to both sides
1641
1642        let table_scan = test_table_scan()?;
1643
1644        let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
1645        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1646
1647        let plan = LogicalPlanBuilder::from(table_scan)
1648            .join_using(table2_scan, JoinType::Left, vec!["a"])?
1649            .project(vec![col("a"), col("b")])?
1650            .build()?;
1651
1652        // make sure projections are pushed down to table scan
1653        let expected = "Projection: test.a, test.b\
1654        \n  Left Join: Using test.a = test2.a\
1655        \n    TableScan: test projection=[a, b]\
1656        \n    TableScan: test2 projection=[a]";
1657
1658        let optimized_plan = optimize(plan)?;
1659        let formatted_plan = format!("{optimized_plan}");
1660        assert_eq!(formatted_plan, expected);
1661
1662        // make sure schema for join node include both join columns
1663        let optimized_join = optimized_plan.inputs()[0];
1664        assert_eq!(
1665            **optimized_join.schema(),
1666            DFSchema::new_with_metadata(
1667                vec![
1668                    (
1669                        Some("test".into()),
1670                        Arc::new(Field::new("a", DataType::UInt32, false))
1671                    ),
1672                    (
1673                        Some("test".into()),
1674                        Arc::new(Field::new("b", DataType::UInt32, false))
1675                    ),
1676                    (
1677                        Some("test2".into()),
1678                        Arc::new(Field::new("a", DataType::UInt32, true))
1679                    ),
1680                ],
1681                HashMap::new()
1682            )?,
1683        );
1684
1685        Ok(())
1686    }
1687
1688    #[test]
1689    fn cast() -> Result<()> {
1690        let table_scan = test_table_scan()?;
1691
1692        let projection = LogicalPlanBuilder::from(table_scan)
1693            .project(vec![Expr::Cast(Cast::new(
1694                Box::new(col("c")),
1695                DataType::Float64,
1696            ))])?
1697            .build()?;
1698
1699        let expected = "Projection: CAST(test.c AS Float64)\
1700        \n  TableScan: test projection=[c]";
1701
1702        assert_optimized_plan_equal(projection, expected)
1703    }
1704
1705    #[test]
1706    fn table_scan_projected_schema() -> Result<()> {
1707        let table_scan = test_table_scan()?;
1708        let plan = LogicalPlanBuilder::from(test_table_scan()?)
1709            .project(vec![col("a"), col("b")])?
1710            .build()?;
1711
1712        assert_eq!(3, table_scan.schema().fields().len());
1713        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1714        assert_fields_eq(&plan, vec!["a", "b"]);
1715
1716        let expected = "TableScan: test projection=[a, b]";
1717
1718        assert_optimized_plan_equal(plan, expected)
1719    }
1720
1721    #[test]
1722    fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
1723        let table_scan = test_table_scan()?;
1724        let input_schema = table_scan.schema();
1725        assert_eq!(3, input_schema.fields().len());
1726        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1727
1728        // Build the LogicalPlan directly (don't use PlanBuilder), so
1729        // that the Column references are unqualified (e.g. their
1730        // relation is `None`). PlanBuilder resolves the expressions
1731        let expr = vec![col("test.a"), col("test.b")];
1732        let plan =
1733            LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
1734
1735        assert_fields_eq(&plan, vec!["a", "b"]);
1736
1737        let expected = "TableScan: test projection=[a, b]";
1738
1739        assert_optimized_plan_equal(plan, expected)
1740    }
1741
1742    #[test]
1743    fn table_limit() -> Result<()> {
1744        let table_scan = test_table_scan()?;
1745        assert_eq!(3, table_scan.schema().fields().len());
1746        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1747
1748        let plan = LogicalPlanBuilder::from(table_scan)
1749            .project(vec![col("c"), col("a")])?
1750            .limit(0, Some(5))?
1751            .build()?;
1752
1753        assert_fields_eq(&plan, vec!["c", "a"]);
1754
1755        let expected = "Limit: skip=0, fetch=5\
1756        \n  Projection: test.c, test.a\
1757        \n    TableScan: test projection=[a, c]";
1758
1759        assert_optimized_plan_equal(plan, expected)
1760    }
1761
1762    #[test]
1763    fn table_scan_without_projection() -> Result<()> {
1764        let table_scan = test_table_scan()?;
1765        let plan = LogicalPlanBuilder::from(table_scan).build()?;
1766        // should expand projection to all columns without projection
1767        let expected = "TableScan: test projection=[a, b, c]";
1768        assert_optimized_plan_equal(plan, expected)
1769    }
1770
1771    #[test]
1772    fn table_scan_with_literal_projection() -> Result<()> {
1773        let table_scan = test_table_scan()?;
1774        let plan = LogicalPlanBuilder::from(table_scan)
1775            .project(vec![lit(1_i64), lit(2_i64)])?
1776            .build()?;
1777        let expected = "Projection: Int64(1), Int64(2)\
1778                      \n  TableScan: test projection=[]";
1779        assert_optimized_plan_equal(plan, expected)
1780    }
1781
1782    /// tests that it removes unused columns in projections
1783    #[test]
1784    fn table_unused_column() -> Result<()> {
1785        let table_scan = test_table_scan()?;
1786        assert_eq!(3, table_scan.schema().fields().len());
1787        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1788
1789        // we never use "b" in the first projection => remove it
1790        let plan = LogicalPlanBuilder::from(table_scan)
1791            .project(vec![col("c"), col("a"), col("b")])?
1792            .filter(col("c").gt(lit(1)))?
1793            .aggregate(vec![col("c")], vec![max(col("a"))])?
1794            .build()?;
1795
1796        assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
1797
1798        let plan = optimize(plan).expect("failed to optimize plan");
1799        let expected = "\
1800        Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]\
1801        \n  Filter: test.c > Int32(1)\
1802        \n    Projection: test.c, test.a\
1803        \n      TableScan: test projection=[a, c]";
1804
1805        assert_optimized_plan_equal(plan, expected)
1806    }
1807
1808    /// tests that it removes un-needed projections
1809    #[test]
1810    fn table_unused_projection() -> Result<()> {
1811        let table_scan = test_table_scan()?;
1812        assert_eq!(3, table_scan.schema().fields().len());
1813        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1814
1815        // there is no need for the first projection
1816        let plan = LogicalPlanBuilder::from(table_scan)
1817            .project(vec![col("b")])?
1818            .project(vec![lit(1).alias("a")])?
1819            .build()?;
1820
1821        assert_fields_eq(&plan, vec!["a"]);
1822
1823        let expected = "\
1824        Projection: Int32(1) AS a\
1825        \n  TableScan: test projection=[]";
1826
1827        assert_optimized_plan_equal(plan, expected)
1828    }
1829
1830    #[test]
1831    fn table_full_filter_pushdown() -> Result<()> {
1832        let schema = Schema::new(test_table_scan_fields());
1833
1834        let table_scan = table_scan_with_filters(
1835            Some("test"),
1836            &schema,
1837            None,
1838            vec![col("b").eq(lit(1))],
1839        )?
1840        .build()?;
1841        assert_eq!(3, table_scan.schema().fields().len());
1842        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1843
1844        // there is no need for the first projection
1845        let plan = LogicalPlanBuilder::from(table_scan)
1846            .project(vec![col("b")])?
1847            .project(vec![lit(1).alias("a")])?
1848            .build()?;
1849
1850        assert_fields_eq(&plan, vec!["a"]);
1851
1852        let expected = "\
1853        Projection: Int32(1) AS a\
1854        \n  TableScan: test projection=[], full_filters=[b = Int32(1)]";
1855
1856        assert_optimized_plan_equal(plan, expected)
1857    }
1858
1859    /// tests that optimizing twice yields same plan
1860    #[test]
1861    fn test_double_optimization() -> Result<()> {
1862        let table_scan = test_table_scan()?;
1863
1864        let plan = LogicalPlanBuilder::from(table_scan)
1865            .project(vec![col("b")])?
1866            .project(vec![lit(1).alias("a")])?
1867            .build()?;
1868
1869        let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
1870        let optimized_plan2 =
1871            optimize(optimized_plan1.clone()).expect("failed to optimize plan");
1872
1873        let formatted_plan1 = format!("{optimized_plan1:?}");
1874        let formatted_plan2 = format!("{optimized_plan2:?}");
1875        assert_eq!(formatted_plan1, formatted_plan2);
1876        Ok(())
1877    }
1878
1879    /// tests that it removes an aggregate is never used downstream
1880    #[test]
1881    fn table_unused_aggregate() -> Result<()> {
1882        let table_scan = test_table_scan()?;
1883        assert_eq!(3, table_scan.schema().fields().len());
1884        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1885
1886        // we never use "min(b)" => remove it
1887        let plan = LogicalPlanBuilder::from(table_scan)
1888            .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
1889            .filter(col("c").gt(lit(1)))?
1890            .project(vec![col("c"), col("a"), col("max(test.b)")])?
1891            .build()?;
1892
1893        assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
1894
1895        let expected = "Projection: test.c, test.a, max(test.b)\
1896        \n  Filter: test.c > Int32(1)\
1897        \n    Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]\
1898        \n      TableScan: test projection=[a, b, c]";
1899
1900        assert_optimized_plan_equal(plan, expected)
1901    }
1902
1903    #[test]
1904    fn aggregate_filter_pushdown() -> Result<()> {
1905        let table_scan = test_table_scan()?;
1906        let aggr_with_filter = count_udaf()
1907            .call(vec![col("b")])
1908            .filter(col("c").gt(lit(42)))
1909            .build()?;
1910        let plan = LogicalPlanBuilder::from(table_scan)
1911            .aggregate(
1912                vec![col("a")],
1913                vec![count(col("b")), aggr_with_filter.alias("count2")],
1914            )?
1915            .build()?;
1916
1917        let expected = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
1918        \n  TableScan: test projection=[a, b, c]";
1919
1920        assert_optimized_plan_equal(plan, expected)
1921    }
1922
1923    #[test]
1924    fn pushdown_through_distinct() -> Result<()> {
1925        let table_scan = test_table_scan()?;
1926
1927        let plan = LogicalPlanBuilder::from(table_scan)
1928            .project(vec![col("a"), col("b")])?
1929            .distinct()?
1930            .project(vec![col("a")])?
1931            .build()?;
1932
1933        let expected = "Projection: test.a\
1934        \n  Distinct:\
1935        \n    TableScan: test projection=[a, b]";
1936
1937        assert_optimized_plan_equal(plan, expected)
1938    }
1939
1940    #[test]
1941    fn test_window() -> Result<()> {
1942        let table_scan = test_table_scan()?;
1943
1944        let max1 = Expr::WindowFunction(expr::WindowFunction::new(
1945            WindowFunctionDefinition::AggregateUDF(max_udaf()),
1946            vec![col("test.a")],
1947        ))
1948        .partition_by(vec![col("test.b")])
1949        .build()
1950        .unwrap();
1951
1952        let max2 = Expr::WindowFunction(expr::WindowFunction::new(
1953            WindowFunctionDefinition::AggregateUDF(max_udaf()),
1954            vec![col("test.b")],
1955        ));
1956        let col1 = col(max1.schema_name().to_string());
1957        let col2 = col(max2.schema_name().to_string());
1958
1959        let plan = LogicalPlanBuilder::from(table_scan)
1960            .window(vec![max1])?
1961            .window(vec![max2])?
1962            .project(vec![col1, col2])?
1963            .build()?;
1964
1965        let expected = "Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1966        \n  WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1967        \n    Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1968        \n      WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1969        \n        TableScan: test projection=[a, b]";
1970
1971        assert_optimized_plan_equal(plan, expected)
1972    }
1973
1974    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
1975
1976    fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
1977        let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
1978        let optimized_plan =
1979            optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
1980        Ok(optimized_plan)
1981    }
1982}