datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29    assert_always_invariants_at_current_node, assert_executable_invariants,
30    InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
35use crate::expr_rewriter::{
36    create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
37};
38use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
39use crate::logical_plan::extension::UserDefinedLogicalNode;
40use crate::logical_plan::{DmlStatement, Statement};
41use crate::utils::{
42    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
43    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
44};
45use crate::{
46    build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47    Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48    TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
49};
50
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::cse::{NormalizeEq, Normalizeable};
53use datafusion_common::tree_node::{
54    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
55};
56use datafusion_common::{
57    aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
58    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
59    FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, TableReference,
60    UnnestOptions,
61};
62use indexmap::IndexSet;
63
64// backwards compatibility
65use crate::display::PgJsonVisitor;
66pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
67pub use datafusion_common::{JoinConstraint, JoinType};
68
69/// A `LogicalPlan` is a node in a tree of relational operators (such as
70/// Projection or Filter).
71///
72/// Represents transforming an input relation (table) to an output relation
73/// (table) with a potentially different schema. Plans form a dataflow tree
74/// where data flows from leaves up to the root to produce the query result.
75///
76/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
77/// or programmatically (for example custom query languages).
78///
79/// # See also:
80/// * [`Expr`]: For the expressions that are evaluated by the plan
81/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
82/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
83///
84/// [`tree_node`]: crate::logical_plan::tree_node
85///
86/// # Examples
87///
88/// ## Creating a LogicalPlan from SQL:
89///
90/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
91///
92/// ## Creating a LogicalPlan from the DataFrame API:
93///
94/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
95///
96/// ## Creating a LogicalPlan programmatically:
97///
98/// See [`LogicalPlanBuilder`]
99///
100/// # Visiting and Rewriting `LogicalPlan`s
101///
102/// Using the [`tree_node`] API, you can recursively walk all nodes in a
103/// `LogicalPlan`. For example, to find all column references in a plan:
104///
105/// ```
106/// # use std::collections::HashSet;
107/// # use arrow::datatypes::{DataType, Field, Schema};
108/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
109/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
110/// # use datafusion_common::{Column, Result};
111/// # fn employee_schema() -> Schema {
112/// #    Schema::new(vec![
113/// #           Field::new("name", DataType::Utf8, false),
114/// #           Field::new("salary", DataType::Int32, false),
115/// #       ])
116/// #   }
117/// // Projection(name, salary)
118/// //   Filter(salary > 1000)
119/// //     TableScan(employee)
120/// # fn main() -> Result<()> {
121/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
122///  .filter(col("salary").gt(lit(1000)))?
123///  .project(vec![col("name")])?
124///  .build()?;
125///
126/// // use apply to walk the plan and collect all expressions
127/// let mut expressions = HashSet::new();
128/// plan.apply(|node| {
129///   // collect all expressions in the plan
130///   node.apply_expressions(|expr| {
131///    expressions.insert(expr.clone());
132///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
133///   })?;
134///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
135/// }).unwrap();
136///
137/// // we found the expression in projection and filter
138/// assert_eq!(expressions.len(), 2);
139/// println!("Found expressions: {:?}", expressions);
140/// // found predicate in the Filter: employee.salary > 1000
141/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
142/// assert!(expressions.contains(&salary.gt(lit(1000))));
143/// // found projection in the Projection: employee.name
144/// let name = Expr::Column(Column::new(Some("employee"), "name"));
145/// assert!(expressions.contains(&name));
146/// # Ok(())
147/// # }
148/// ```
149///
150/// You can also rewrite plans using the [`tree_node`] API. For example, to
151/// replace the filter predicate in a plan:
152///
153/// ```
154/// # use std::collections::HashSet;
155/// # use arrow::datatypes::{DataType, Field, Schema};
156/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
157/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
158/// # use datafusion_common::{Column, Result};
159/// # fn employee_schema() -> Schema {
160/// #    Schema::new(vec![
161/// #           Field::new("name", DataType::Utf8, false),
162/// #           Field::new("salary", DataType::Int32, false),
163/// #       ])
164/// #   }
165/// // Projection(name, salary)
166/// //   Filter(salary > 1000)
167/// //     TableScan(employee)
168/// # fn main() -> Result<()> {
169/// use datafusion_common::tree_node::Transformed;
170/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
171///  .filter(col("salary").gt(lit(1000)))?
172///  .project(vec![col("name")])?
173///  .build()?;
174///
175/// // use transform to rewrite the plan
176/// let transformed_result = plan.transform(|node| {
177///   // when we see the filter node
178///   if let LogicalPlan::Filter(mut filter) = node {
179///     // replace predicate with salary < 2000
180///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
181///     let new_plan = LogicalPlan::Filter(filter);
182///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
183///   }
184///   // return the node unchanged
185///   Ok(Transformed::no(node))
186/// }).unwrap();
187///
188/// // Transformed result contains rewritten plan and information about
189/// // whether the plan was changed
190/// assert!(transformed_result.transformed);
191/// let rewritten_plan = transformed_result.data;
192///
193/// // we found the filter
194/// assert_eq!(rewritten_plan.display_indent().to_string(),
195/// "Projection: employee.name\
196/// \n  Filter: employee.salary < Int32(2000)\
197/// \n    TableScan: employee");
198/// # Ok(())
199/// # }
200/// ```
201///
202#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
203pub enum LogicalPlan {
204    /// Evaluates an arbitrary list of expressions (essentially a
205    /// SELECT with an expression list) on its input.
206    Projection(Projection),
207    /// Filters rows from its input that do not match an
208    /// expression (essentially a WHERE clause with a predicate
209    /// expression).
210    ///
211    /// Semantically, `<predicate>` is evaluated for each row of the
212    /// input; If the value of `<predicate>` is true, the input row is
213    /// passed to the output. If the value of `<predicate>` is false
214    /// (or null), the row is discarded.
215    Filter(Filter),
216    /// Windows input based on a set of window spec and window
217    /// function (e.g. SUM or RANK).  This is used to implement SQL
218    /// window functions, and the `OVER` clause.
219    ///
220    /// See [`Window`] for more details
221    Window(Window),
222    /// Aggregates its input based on a set of grouping and aggregate
223    /// expressions (e.g. SUM). This is used to implement SQL aggregates
224    /// and `GROUP BY`.
225    ///
226    /// See [`Aggregate`] for more details
227    Aggregate(Aggregate),
228    /// Sorts its input according to a list of sort expressions. This
229    /// is used to implement SQL `ORDER BY`
230    Sort(Sort),
231    /// Join two logical plans on one or more join columns.
232    /// This is used to implement SQL `JOIN`
233    Join(Join),
234    /// Repartitions the input based on a partitioning scheme. This is
235    /// used to add parallelism and is sometimes referred to as an
236    /// "exchange" operator in other systems
237    Repartition(Repartition),
238    /// Union multiple inputs with the same schema into a single
239    /// output stream. This is used to implement SQL `UNION [ALL]` and
240    /// `INTERSECT [ALL]`.
241    Union(Union),
242    /// Produces rows from a [`TableSource`], used to implement SQL
243    /// `FROM` tables or views.
244    TableScan(TableScan),
245    /// Produces no rows: An empty relation with an empty schema that
246    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
247    /// that has no values in the `FROM` clause.
248    EmptyRelation(EmptyRelation),
249    /// Produces the output of running another query.  This is used to
250    /// implement SQL subqueries
251    Subquery(Subquery),
252    /// Aliased relation provides, or changes, the name of a relation.
253    SubqueryAlias(SubqueryAlias),
254    /// Skip some number of rows, and then fetch some number of rows.
255    Limit(Limit),
256    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
257    Statement(Statement),
258    /// Values expression. See
259    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
260    /// documentation for more details. This is used to implement SQL such as
261    /// `VALUES (1, 2), (3, 4)`
262    Values(Values),
263    /// Produces a relation with string representations of
264    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
265    Explain(Explain),
266    /// Runs the input, and prints annotated physical plan as a string
267    /// with execution metric. This is used to implement SQL
268    /// `EXPLAIN ANALYZE`.
269    Analyze(Analyze),
270    /// Extension operator defined outside of DataFusion. This is used
271    /// to extend DataFusion with custom relational operations that
272    Extension(Extension),
273    /// Remove duplicate rows from the input. This is used to
274    /// implement SQL `SELECT DISTINCT ...`.
275    Distinct(Distinct),
276    /// Data Manipulation Language (DML): Insert / Update / Delete
277    Dml(DmlStatement),
278    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
279    Ddl(DdlStatement),
280    /// `COPY TO` for writing plan results to files
281    Copy(CopyTo),
282    /// Describe the schema of the table. This is used to implement the
283    /// SQL `DESCRIBE` command from MySQL.
284    DescribeTable(DescribeTable),
285    /// Unnest a column that contains a nested list type such as an
286    /// ARRAY. This is used to implement SQL `UNNEST`
287    Unnest(Unnest),
288    /// A variadic query (e.g. "Recursive CTEs")
289    RecursiveQuery(RecursiveQuery),
290}
291
292impl Default for LogicalPlan {
293    fn default() -> Self {
294        LogicalPlan::EmptyRelation(EmptyRelation {
295            produce_one_row: false,
296            schema: Arc::new(DFSchema::empty()),
297        })
298    }
299}
300
301impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
302    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
303        &'a self,
304        mut f: F,
305    ) -> Result<TreeNodeRecursion> {
306        f(self)
307    }
308
309    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
310        self,
311        mut f: F,
312    ) -> Result<Transformed<Self>> {
313        f(self)
314    }
315}
316
317impl LogicalPlan {
318    /// Get a reference to the logical plan's schema
319    pub fn schema(&self) -> &DFSchemaRef {
320        match self {
321            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
322            LogicalPlan::Values(Values { schema, .. }) => schema,
323            LogicalPlan::TableScan(TableScan {
324                projected_schema, ..
325            }) => projected_schema,
326            LogicalPlan::Projection(Projection { schema, .. }) => schema,
327            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
328            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
329            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
330            LogicalPlan::Window(Window { schema, .. }) => schema,
331            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
332            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
333            LogicalPlan::Join(Join { schema, .. }) => schema,
334            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
335            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
336            LogicalPlan::Statement(statement) => statement.schema(),
337            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
338            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
339            LogicalPlan::Explain(explain) => &explain.schema,
340            LogicalPlan::Analyze(analyze) => &analyze.schema,
341            LogicalPlan::Extension(extension) => extension.node.schema(),
342            LogicalPlan::Union(Union { schema, .. }) => schema,
343            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
344                output_schema
345            }
346            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347            LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348            LogicalPlan::Ddl(ddl) => ddl.schema(),
349            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
351                // we take the schema of the static term as the schema of the entire recursive query
352                static_term.schema()
353            }
354        }
355    }
356
357    /// Used for normalizing columns, as the fallback schemas to the main schema
358    /// of the plan.
359    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
360        match self {
361            LogicalPlan::Window(_)
362            | LogicalPlan::Projection(_)
363            | LogicalPlan::Aggregate(_)
364            | LogicalPlan::Unnest(_)
365            | LogicalPlan::Join(_) => self
366                .inputs()
367                .iter()
368                .map(|input| input.schema().as_ref())
369                .collect(),
370            _ => vec![],
371        }
372    }
373
374    /// Returns the (fixed) output schema for explain plans
375    pub fn explain_schema() -> SchemaRef {
376        SchemaRef::new(Schema::new(vec![
377            Field::new("plan_type", DataType::Utf8, false),
378            Field::new("plan", DataType::Utf8, false),
379        ]))
380    }
381
382    /// Returns the (fixed) output schema for `DESCRIBE` plans
383    pub fn describe_schema() -> Schema {
384        Schema::new(vec![
385            Field::new("column_name", DataType::Utf8, false),
386            Field::new("data_type", DataType::Utf8, false),
387            Field::new("is_nullable", DataType::Utf8, false),
388        ])
389    }
390
391    /// Returns all expressions (non-recursively) evaluated by the current
392    /// logical plan node. This does not include expressions in any children.
393    ///
394    /// Note this method `clone`s all the expressions. When possible, the
395    /// [`tree_node`] API should be used instead of this API.
396    ///
397    /// The returned expressions do not necessarily represent or even
398    /// contributed to the output schema of this node. For example,
399    /// `LogicalPlan::Filter` returns the filter expression even though the
400    /// output of a Filter has the same columns as the input.
401    ///
402    /// The expressions do contain all the columns that are used by this plan,
403    /// so if there are columns not referenced by these expressions then
404    /// DataFusion's optimizer attempts to optimize them away.
405    ///
406    /// [`tree_node`]: crate::logical_plan::tree_node
407    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
408        let mut exprs = vec![];
409        self.apply_expressions(|e| {
410            exprs.push(e.clone());
411            Ok(TreeNodeRecursion::Continue)
412        })
413        // closure always returns OK
414        .unwrap();
415        exprs
416    }
417
418    /// Returns all the out reference(correlated) expressions (recursively) in the current
419    /// logical plan nodes and all its descendant nodes.
420    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
421        let mut exprs = vec![];
422        self.apply_expressions(|e| {
423            find_out_reference_exprs(e).into_iter().for_each(|e| {
424                if !exprs.contains(&e) {
425                    exprs.push(e)
426                }
427            });
428            Ok(TreeNodeRecursion::Continue)
429        })
430        // closure always returns OK
431        .unwrap();
432        self.inputs()
433            .into_iter()
434            .flat_map(|child| child.all_out_ref_exprs())
435            .for_each(|e| {
436                if !exprs.contains(&e) {
437                    exprs.push(e)
438                }
439            });
440        exprs
441    }
442
443    /// Returns all inputs / children of this `LogicalPlan` node.
444    ///
445    /// Note does not include inputs to inputs, or subqueries.
446    pub fn inputs(&self) -> Vec<&LogicalPlan> {
447        match self {
448            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
449            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
450            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
451            LogicalPlan::Window(Window { input, .. }) => vec![input],
452            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
453            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
454            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
455            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
456            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
457            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
458            LogicalPlan::Extension(extension) => extension.node.inputs(),
459            LogicalPlan::Union(Union { inputs, .. }) => {
460                inputs.iter().map(|arc| arc.as_ref()).collect()
461            }
462            LogicalPlan::Distinct(
463                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
464            ) => vec![input],
465            LogicalPlan::Explain(explain) => vec![&explain.plan],
466            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
467            LogicalPlan::Dml(write) => vec![&write.input],
468            LogicalPlan::Copy(copy) => vec![&copy.input],
469            LogicalPlan::Ddl(ddl) => ddl.inputs(),
470            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
471            LogicalPlan::RecursiveQuery(RecursiveQuery {
472                static_term,
473                recursive_term,
474                ..
475            }) => vec![static_term, recursive_term],
476            LogicalPlan::Statement(stmt) => stmt.inputs(),
477            // plans without inputs
478            LogicalPlan::TableScan { .. }
479            | LogicalPlan::EmptyRelation { .. }
480            | LogicalPlan::Values { .. }
481            | LogicalPlan::DescribeTable(_) => vec![],
482        }
483    }
484
485    /// returns all `Using` join columns in a logical plan
486    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
487        let mut using_columns: Vec<HashSet<Column>> = vec![];
488
489        self.apply_with_subqueries(|plan| {
490            if let LogicalPlan::Join(Join {
491                join_constraint: JoinConstraint::Using,
492                on,
493                ..
494            }) = plan
495            {
496                // The join keys in using-join must be columns.
497                let columns =
498                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
499                        let Some(l) = l.get_as_join_column() else {
500                            return internal_err!(
501                                "Invalid join key. Expected column, found {l:?}"
502                            );
503                        };
504                        let Some(r) = r.get_as_join_column() else {
505                            return internal_err!(
506                                "Invalid join key. Expected column, found {r:?}"
507                            );
508                        };
509                        accumu.insert(l.to_owned());
510                        accumu.insert(r.to_owned());
511                        Result::<_, DataFusionError>::Ok(accumu)
512                    })?;
513                using_columns.push(columns);
514            }
515            Ok(TreeNodeRecursion::Continue)
516        })?;
517
518        Ok(using_columns)
519    }
520
521    /// returns the first output expression of this `LogicalPlan` node.
522    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
523        match self {
524            LogicalPlan::Projection(projection) => {
525                Ok(Some(projection.expr.as_slice()[0].clone()))
526            }
527            LogicalPlan::Aggregate(agg) => {
528                if agg.group_expr.is_empty() {
529                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
530                } else {
531                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
532                }
533            }
534            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
535                Ok(Some(select_expr[0].clone()))
536            }
537            LogicalPlan::Filter(Filter { input, .. })
538            | LogicalPlan::Distinct(Distinct::All(input))
539            | LogicalPlan::Sort(Sort { input, .. })
540            | LogicalPlan::Limit(Limit { input, .. })
541            | LogicalPlan::Repartition(Repartition { input, .. })
542            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
543            LogicalPlan::Join(Join {
544                left,
545                right,
546                join_type,
547                ..
548            }) => match join_type {
549                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
550                    if left.schema().fields().is_empty() {
551                        right.head_output_expr()
552                    } else {
553                        left.head_output_expr()
554                    }
555                }
556                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
557                    left.head_output_expr()
558                }
559                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
560            },
561            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
562                static_term.head_output_expr()
563            }
564            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
565                union.schema.qualified_field(0),
566            )))),
567            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
568                table.projected_schema.qualified_field(0),
569            )))),
570            LogicalPlan::SubqueryAlias(subquery_alias) => {
571                let expr_opt = subquery_alias.input.head_output_expr()?;
572                expr_opt
573                    .map(|expr| {
574                        Ok(Expr::Column(create_col_from_scalar_expr(
575                            &expr,
576                            subquery_alias.alias.to_string(),
577                        )?))
578                    })
579                    .map_or(Ok(None), |v| v.map(Some))
580            }
581            LogicalPlan::Subquery(_) => Ok(None),
582            LogicalPlan::EmptyRelation(_)
583            | LogicalPlan::Statement(_)
584            | LogicalPlan::Values(_)
585            | LogicalPlan::Explain(_)
586            | LogicalPlan::Analyze(_)
587            | LogicalPlan::Extension(_)
588            | LogicalPlan::Dml(_)
589            | LogicalPlan::Copy(_)
590            | LogicalPlan::Ddl(_)
591            | LogicalPlan::DescribeTable(_)
592            | LogicalPlan::Unnest(_) => Ok(None),
593        }
594    }
595
596    /// Recomputes schema and type information for this LogicalPlan if needed.
597    ///
598    /// Some `LogicalPlan`s may need to recompute their schema if the number or
599    /// type of expressions have been changed (for example due to type
600    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
601    /// its expressions.
602    ///
603    /// Some `LogicalPlan`s schema is unaffected by any changes to their
604    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
605    /// same as its input schema.
606    ///
607    /// This is useful after modifying a plans `Expr`s (or input plans) via
608    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
609    /// [Self::with_new_exprs], this method does not require a new set of
610    /// expressions or inputs plans.
611    ///
612    /// # Return value
613    /// Returns an error if there is some issue recomputing the schema.
614    ///
615    /// # Notes
616    ///
617    /// * Does not recursively recompute schema for input (child) plans.
618    pub fn recompute_schema(self) -> Result<Self> {
619        match self {
620            // Since expr may be different than the previous expr, schema of the projection
621            // may change. We need to use try_new method instead of try_new_with_schema method.
622            LogicalPlan::Projection(Projection {
623                expr,
624                input,
625                schema: _,
626            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
627            LogicalPlan::Dml(_) => Ok(self),
628            LogicalPlan::Copy(_) => Ok(self),
629            LogicalPlan::Values(Values { schema, values }) => {
630                // todo it isn't clear why the schema is not recomputed here
631                Ok(LogicalPlan::Values(Values { schema, values }))
632            }
633            LogicalPlan::Filter(Filter { predicate, input }) => {
634                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
635            }
636            LogicalPlan::Repartition(_) => Ok(self),
637            LogicalPlan::Window(Window {
638                input,
639                window_expr,
640                schema: _,
641            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
642            LogicalPlan::Aggregate(Aggregate {
643                input,
644                group_expr,
645                aggr_expr,
646                schema: _,
647            }) => Aggregate::try_new(input, group_expr, aggr_expr)
648                .map(LogicalPlan::Aggregate),
649            LogicalPlan::Sort(_) => Ok(self),
650            LogicalPlan::Join(Join {
651                left,
652                right,
653                filter,
654                join_type,
655                join_constraint,
656                on,
657                schema: _,
658                null_equals_null,
659            }) => {
660                let schema =
661                    build_join_schema(left.schema(), right.schema(), &join_type)?;
662
663                let new_on: Vec<_> = on
664                    .into_iter()
665                    .map(|equi_expr| {
666                        // SimplifyExpression rule may add alias to the equi_expr.
667                        (equi_expr.0.unalias(), equi_expr.1.unalias())
668                    })
669                    .collect();
670
671                Ok(LogicalPlan::Join(Join {
672                    left,
673                    right,
674                    join_type,
675                    join_constraint,
676                    on: new_on,
677                    filter,
678                    schema: DFSchemaRef::new(schema),
679                    null_equals_null,
680                }))
681            }
682            LogicalPlan::Subquery(_) => Ok(self),
683            LogicalPlan::SubqueryAlias(SubqueryAlias {
684                input,
685                alias,
686                schema: _,
687            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
688            LogicalPlan::Limit(_) => Ok(self),
689            LogicalPlan::Ddl(_) => Ok(self),
690            LogicalPlan::Extension(Extension { node }) => {
691                // todo make an API that does not require cloning
692                // This requires a copy of the extension nodes expressions and inputs
693                let expr = node.expressions();
694                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
695                Ok(LogicalPlan::Extension(Extension {
696                    node: node.with_exprs_and_inputs(expr, inputs)?,
697                }))
698            }
699            LogicalPlan::Union(Union { inputs, schema }) => {
700                let first_input_schema = inputs[0].schema();
701                if schema.fields().len() == first_input_schema.fields().len() {
702                    // If inputs are not pruned do not change schema
703                    Ok(LogicalPlan::Union(Union { inputs, schema }))
704                } else {
705                    // A note on `Union`s constructed via `try_new_by_name`:
706                    //
707                    // At this point, the schema for each input should have
708                    // the same width. Thus, we do not need to save whether a
709                    // `Union` was created `BY NAME`, and can safely rely on the
710                    // `try_new` initializer to derive the new schema based on
711                    // column positions.
712                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
713                }
714            }
715            LogicalPlan::Distinct(distinct) => {
716                let distinct = match distinct {
717                    Distinct::All(input) => Distinct::All(input),
718                    Distinct::On(DistinctOn {
719                        on_expr,
720                        select_expr,
721                        sort_expr,
722                        input,
723                        schema: _,
724                    }) => Distinct::On(DistinctOn::try_new(
725                        on_expr,
726                        select_expr,
727                        sort_expr,
728                        input,
729                    )?),
730                };
731                Ok(LogicalPlan::Distinct(distinct))
732            }
733            LogicalPlan::RecursiveQuery(_) => Ok(self),
734            LogicalPlan::Analyze(_) => Ok(self),
735            LogicalPlan::Explain(_) => Ok(self),
736            LogicalPlan::TableScan(_) => Ok(self),
737            LogicalPlan::EmptyRelation(_) => Ok(self),
738            LogicalPlan::Statement(_) => Ok(self),
739            LogicalPlan::DescribeTable(_) => Ok(self),
740            LogicalPlan::Unnest(Unnest {
741                input,
742                exec_columns,
743                options,
744                ..
745            }) => {
746                // Update schema with unnested column type.
747                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
748            }
749        }
750    }
751
752    /// Returns a new `LogicalPlan` based on `self` with inputs and
753    /// expressions replaced.
754    ///
755    /// Note this method creates an entirely new node, which requires a large
756    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
757    /// instead of this API.
758    ///
759    /// The exprs correspond to the same order of expressions returned
760    /// by [`Self::expressions`]. This function is used by optimizers
761    /// to rewrite plans using the following pattern:
762    ///
763    /// [`tree_node`]: crate::logical_plan::tree_node
764    ///
765    /// ```text
766    /// let new_inputs = optimize_children(..., plan, props);
767    ///
768    /// // get the plans expressions to optimize
769    /// let exprs = plan.expressions();
770    ///
771    /// // potentially rewrite plan expressions
772    /// let rewritten_exprs = rewrite_exprs(exprs);
773    ///
774    /// // create new plan using rewritten_exprs in same position
775    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
776    /// ```
777    pub fn with_new_exprs(
778        &self,
779        mut expr: Vec<Expr>,
780        inputs: Vec<LogicalPlan>,
781    ) -> Result<LogicalPlan> {
782        match self {
783            // Since expr may be different than the previous expr, schema of the projection
784            // may change. We need to use try_new method instead of try_new_with_schema method.
785            LogicalPlan::Projection(Projection { .. }) => {
786                let input = self.only_input(inputs)?;
787                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
788            }
789            LogicalPlan::Dml(DmlStatement {
790                table_name,
791                target,
792                op,
793                ..
794            }) => {
795                self.assert_no_expressions(expr)?;
796                let input = self.only_input(inputs)?;
797                Ok(LogicalPlan::Dml(DmlStatement::new(
798                    table_name.clone(),
799                    Arc::clone(target),
800                    op.clone(),
801                    Arc::new(input),
802                )))
803            }
804            LogicalPlan::Copy(CopyTo {
805                input: _,
806                output_url,
807                file_type,
808                options,
809                partition_by,
810            }) => {
811                self.assert_no_expressions(expr)?;
812                let input = self.only_input(inputs)?;
813                Ok(LogicalPlan::Copy(CopyTo {
814                    input: Arc::new(input),
815                    output_url: output_url.clone(),
816                    file_type: Arc::clone(file_type),
817                    options: options.clone(),
818                    partition_by: partition_by.clone(),
819                }))
820            }
821            LogicalPlan::Values(Values { schema, .. }) => {
822                self.assert_no_inputs(inputs)?;
823                Ok(LogicalPlan::Values(Values {
824                    schema: Arc::clone(schema),
825                    values: expr
826                        .chunks_exact(schema.fields().len())
827                        .map(|s| s.to_vec())
828                        .collect(),
829                }))
830            }
831            LogicalPlan::Filter { .. } => {
832                let predicate = self.only_expr(expr)?;
833                let input = self.only_input(inputs)?;
834
835                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
836            }
837            LogicalPlan::Repartition(Repartition {
838                partitioning_scheme,
839                ..
840            }) => match partitioning_scheme {
841                Partitioning::RoundRobinBatch(n) => {
842                    self.assert_no_expressions(expr)?;
843                    let input = self.only_input(inputs)?;
844                    Ok(LogicalPlan::Repartition(Repartition {
845                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
846                        input: Arc::new(input),
847                    }))
848                }
849                Partitioning::Hash(_, n) => {
850                    let input = self.only_input(inputs)?;
851                    Ok(LogicalPlan::Repartition(Repartition {
852                        partitioning_scheme: Partitioning::Hash(expr, *n),
853                        input: Arc::new(input),
854                    }))
855                }
856                Partitioning::DistributeBy(_) => {
857                    let input = self.only_input(inputs)?;
858                    Ok(LogicalPlan::Repartition(Repartition {
859                        partitioning_scheme: Partitioning::DistributeBy(expr),
860                        input: Arc::new(input),
861                    }))
862                }
863            },
864            LogicalPlan::Window(Window { window_expr, .. }) => {
865                assert_eq!(window_expr.len(), expr.len());
866                let input = self.only_input(inputs)?;
867                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
868            }
869            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
870                let input = self.only_input(inputs)?;
871                // group exprs are the first expressions
872                let agg_expr = expr.split_off(group_expr.len());
873
874                Aggregate::try_new(Arc::new(input), expr, agg_expr)
875                    .map(LogicalPlan::Aggregate)
876            }
877            LogicalPlan::Sort(Sort {
878                expr: sort_expr,
879                fetch,
880                ..
881            }) => {
882                let input = self.only_input(inputs)?;
883                Ok(LogicalPlan::Sort(Sort {
884                    expr: expr
885                        .into_iter()
886                        .zip(sort_expr.iter())
887                        .map(|(expr, sort)| sort.with_expr(expr))
888                        .collect(),
889                    input: Arc::new(input),
890                    fetch: *fetch,
891                }))
892            }
893            LogicalPlan::Join(Join {
894                join_type,
895                join_constraint,
896                on,
897                null_equals_null,
898                ..
899            }) => {
900                let (left, right) = self.only_two_inputs(inputs)?;
901                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
902
903                let equi_expr_count = on.len() * 2;
904                assert!(expr.len() >= equi_expr_count);
905
906                // Assume that the last expr, if any,
907                // is the filter_expr (non equality predicate from ON clause)
908                let filter_expr = if expr.len() > equi_expr_count {
909                    expr.pop()
910                } else {
911                    None
912                };
913
914                // The first part of expr is equi-exprs,
915                // and the struct of each equi-expr is like `left-expr = right-expr`.
916                assert_eq!(expr.len(), equi_expr_count);
917                let mut new_on = Vec::with_capacity(on.len());
918                let mut iter = expr.into_iter();
919                while let Some(left) = iter.next() {
920                    let Some(right) = iter.next() else {
921                        internal_err!("Expected a pair of expressions to construct the join on expression")?
922                    };
923
924                    // SimplifyExpression rule may add alias to the equi_expr.
925                    new_on.push((left.unalias(), right.unalias()));
926                }
927
928                Ok(LogicalPlan::Join(Join {
929                    left: Arc::new(left),
930                    right: Arc::new(right),
931                    join_type: *join_type,
932                    join_constraint: *join_constraint,
933                    on: new_on,
934                    filter: filter_expr,
935                    schema: DFSchemaRef::new(schema),
936                    null_equals_null: *null_equals_null,
937                }))
938            }
939            LogicalPlan::Subquery(Subquery {
940                outer_ref_columns,
941                spans,
942                ..
943            }) => {
944                self.assert_no_expressions(expr)?;
945                let input = self.only_input(inputs)?;
946                let subquery = LogicalPlanBuilder::from(input).build()?;
947                Ok(LogicalPlan::Subquery(Subquery {
948                    subquery: Arc::new(subquery),
949                    outer_ref_columns: outer_ref_columns.clone(),
950                    spans: spans.clone(),
951                }))
952            }
953            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
954                self.assert_no_expressions(expr)?;
955                let input = self.only_input(inputs)?;
956                SubqueryAlias::try_new(Arc::new(input), alias.clone())
957                    .map(LogicalPlan::SubqueryAlias)
958            }
959            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
960                let old_expr_len = skip.iter().chain(fetch.iter()).count();
961                if old_expr_len != expr.len() {
962                    return internal_err!(
963                        "Invalid number of new Limit expressions: expected {}, got {}",
964                        old_expr_len,
965                        expr.len()
966                    );
967                }
968                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
969                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
970                let new_skip = skip.as_ref().and_then(|_| expr.pop());
971                let input = self.only_input(inputs)?;
972                Ok(LogicalPlan::Limit(Limit {
973                    skip: new_skip.map(Box::new),
974                    fetch: new_fetch.map(Box::new),
975                    input: Arc::new(input),
976                }))
977            }
978            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
979                name,
980                if_not_exists,
981                or_replace,
982                column_defaults,
983                temporary,
984                ..
985            })) => {
986                self.assert_no_expressions(expr)?;
987                let input = self.only_input(inputs)?;
988                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
989                    CreateMemoryTable {
990                        input: Arc::new(input),
991                        constraints: Constraints::empty(),
992                        name: name.clone(),
993                        if_not_exists: *if_not_exists,
994                        or_replace: *or_replace,
995                        column_defaults: column_defaults.clone(),
996                        temporary: *temporary,
997                    },
998                )))
999            }
1000            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1001                name,
1002                or_replace,
1003                definition,
1004                temporary,
1005                ..
1006            })) => {
1007                self.assert_no_expressions(expr)?;
1008                let input = self.only_input(inputs)?;
1009                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1010                    input: Arc::new(input),
1011                    name: name.clone(),
1012                    or_replace: *or_replace,
1013                    temporary: *temporary,
1014                    definition: definition.clone(),
1015                })))
1016            }
1017            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1018                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1019            })),
1020            LogicalPlan::Union(Union { schema, .. }) => {
1021                self.assert_no_expressions(expr)?;
1022                let input_schema = inputs[0].schema();
1023                // If inputs are not pruned do not change schema.
1024                let schema = if schema.fields().len() == input_schema.fields().len() {
1025                    Arc::clone(schema)
1026                } else {
1027                    Arc::clone(input_schema)
1028                };
1029                Ok(LogicalPlan::Union(Union {
1030                    inputs: inputs.into_iter().map(Arc::new).collect(),
1031                    schema,
1032                }))
1033            }
1034            LogicalPlan::Distinct(distinct) => {
1035                let distinct = match distinct {
1036                    Distinct::All(_) => {
1037                        self.assert_no_expressions(expr)?;
1038                        let input = self.only_input(inputs)?;
1039                        Distinct::All(Arc::new(input))
1040                    }
1041                    Distinct::On(DistinctOn {
1042                        on_expr,
1043                        select_expr,
1044                        ..
1045                    }) => {
1046                        let input = self.only_input(inputs)?;
1047                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1048                        let select_expr = expr.split_off(on_expr.len());
1049                        assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1050                        Distinct::On(DistinctOn::try_new(
1051                            expr,
1052                            select_expr,
1053                            None, // no sort expressions accepted
1054                            Arc::new(input),
1055                        )?)
1056                    }
1057                };
1058                Ok(LogicalPlan::Distinct(distinct))
1059            }
1060            LogicalPlan::RecursiveQuery(RecursiveQuery {
1061                name, is_distinct, ..
1062            }) => {
1063                self.assert_no_expressions(expr)?;
1064                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1065                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1066                    name: name.clone(),
1067                    static_term: Arc::new(static_term),
1068                    recursive_term: Arc::new(recursive_term),
1069                    is_distinct: *is_distinct,
1070                }))
1071            }
1072            LogicalPlan::Analyze(a) => {
1073                self.assert_no_expressions(expr)?;
1074                let input = self.only_input(inputs)?;
1075                Ok(LogicalPlan::Analyze(Analyze {
1076                    verbose: a.verbose,
1077                    schema: Arc::clone(&a.schema),
1078                    input: Arc::new(input),
1079                }))
1080            }
1081            LogicalPlan::Explain(e) => {
1082                self.assert_no_expressions(expr)?;
1083                let input = self.only_input(inputs)?;
1084                Ok(LogicalPlan::Explain(Explain {
1085                    verbose: e.verbose,
1086                    plan: Arc::new(input),
1087                    explain_format: e.explain_format.clone(),
1088                    stringified_plans: e.stringified_plans.clone(),
1089                    schema: Arc::clone(&e.schema),
1090                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1091                }))
1092            }
1093            LogicalPlan::Statement(Statement::Prepare(Prepare {
1094                name,
1095                data_types,
1096                ..
1097            })) => {
1098                self.assert_no_expressions(expr)?;
1099                let input = self.only_input(inputs)?;
1100                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1101                    name: name.clone(),
1102                    data_types: data_types.clone(),
1103                    input: Arc::new(input),
1104                })))
1105            }
1106            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1107                self.assert_no_inputs(inputs)?;
1108                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1109                    name: name.clone(),
1110                    parameters: expr,
1111                })))
1112            }
1113            LogicalPlan::TableScan(ts) => {
1114                self.assert_no_inputs(inputs)?;
1115                Ok(LogicalPlan::TableScan(TableScan {
1116                    filters: expr,
1117                    ..ts.clone()
1118                }))
1119            }
1120            LogicalPlan::EmptyRelation(_)
1121            | LogicalPlan::Ddl(_)
1122            | LogicalPlan::Statement(_)
1123            | LogicalPlan::DescribeTable(_) => {
1124                // All of these plan types have no inputs / exprs so should not be called
1125                self.assert_no_expressions(expr)?;
1126                self.assert_no_inputs(inputs)?;
1127                Ok(self.clone())
1128            }
1129            LogicalPlan::Unnest(Unnest {
1130                exec_columns: columns,
1131                options,
1132                ..
1133            }) => {
1134                self.assert_no_expressions(expr)?;
1135                let input = self.only_input(inputs)?;
1136                // Update schema with unnested column type.
1137                let new_plan =
1138                    unnest_with_options(input, columns.clone(), options.clone())?;
1139                Ok(new_plan)
1140            }
1141        }
1142    }
1143
1144    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1145    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1146        match check {
1147            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1148            InvariantLevel::Executable => assert_executable_invariants(self),
1149        }
1150    }
1151
1152    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1153    #[inline]
1154    #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1155    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1156        if !expr.is_empty() {
1157            return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1158        }
1159        Ok(())
1160    }
1161
1162    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1163    #[inline]
1164    #[allow(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1165    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1166        if !inputs.is_empty() {
1167            return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1168        }
1169        Ok(())
1170    }
1171
1172    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1173    #[inline]
1174    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1175        if expr.len() != 1 {
1176            return internal_err!(
1177                "{self:?} should have exactly one expr, got {:?}",
1178                expr
1179            );
1180        }
1181        Ok(expr.remove(0))
1182    }
1183
1184    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1185    #[inline]
1186    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1187        if inputs.len() != 1 {
1188            return internal_err!(
1189                "{self:?} should have exactly one input, got {:?}",
1190                inputs
1191            );
1192        }
1193        Ok(inputs.remove(0))
1194    }
1195
1196    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1197    #[inline]
1198    fn only_two_inputs(
1199        &self,
1200        mut inputs: Vec<LogicalPlan>,
1201    ) -> Result<(LogicalPlan, LogicalPlan)> {
1202        if inputs.len() != 2 {
1203            return internal_err!(
1204                "{self:?} should have exactly two inputs, got {:?}",
1205                inputs
1206            );
1207        }
1208        let right = inputs.remove(1);
1209        let left = inputs.remove(0);
1210        Ok((left, right))
1211    }
1212
1213    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1214    /// with the specified `param_values`.
1215    ///
1216    /// [`Prepare`] statements are converted to
1217    /// their inner logical plan for execution.
1218    ///
1219    /// # Example
1220    /// ```
1221    /// # use arrow::datatypes::{Field, Schema, DataType};
1222    /// use datafusion_common::ScalarValue;
1223    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1224    /// # let schema = Schema::new(vec![
1225    /// #     Field::new("id", DataType::Int32, false),
1226    /// # ]);
1227    /// // Build SELECT * FROM t1 WHERE id = $1
1228    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1229    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1230    ///     .build().unwrap();
1231    ///
1232    /// assert_eq!(
1233    ///   "Filter: t1.id = $1\
1234    ///   \n  TableScan: t1",
1235    ///   plan.display_indent().to_string()
1236    /// );
1237    ///
1238    /// // Fill in the parameter $1 with a literal 3
1239    /// let plan = plan.with_param_values(vec![
1240    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1241    /// ]).unwrap();
1242    ///
1243    /// assert_eq!(
1244    ///    "Filter: t1.id = Int32(3)\
1245    ///    \n  TableScan: t1",
1246    ///    plan.display_indent().to_string()
1247    ///  );
1248    ///
1249    /// // Note you can also used named parameters
1250    /// // Build SELECT * FROM t1 WHERE id = $my_param
1251    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1252    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1253    ///     .build().unwrap()
1254    ///     // Fill in the parameter $my_param with a literal 3
1255    ///     .with_param_values(vec![
1256    ///       ("my_param", ScalarValue::from(3i32)),
1257    ///     ]).unwrap();
1258    ///
1259    /// assert_eq!(
1260    ///    "Filter: t1.id = Int32(3)\
1261    ///    \n  TableScan: t1",
1262    ///    plan.display_indent().to_string()
1263    ///  );
1264    ///
1265    /// ```
1266    pub fn with_param_values(
1267        self,
1268        param_values: impl Into<ParamValues>,
1269    ) -> Result<LogicalPlan> {
1270        let param_values = param_values.into();
1271        let plan_with_values = self.replace_params_with_values(&param_values)?;
1272
1273        // unwrap Prepare
1274        Ok(
1275            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1276                plan_with_values
1277            {
1278                param_values.verify(&prepare_lp.data_types)?;
1279                // try and take ownership of the input if is not shared, clone otherwise
1280                Arc::unwrap_or_clone(prepare_lp.input)
1281            } else {
1282                plan_with_values
1283            },
1284        )
1285    }
1286
1287    /// Returns the maximum number of rows that this plan can output, if known.
1288    ///
1289    /// If `None`, the plan can return any number of rows.
1290    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1291    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1292        match self {
1293            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1294            LogicalPlan::Filter(filter) => {
1295                if filter.is_scalar() {
1296                    Some(1)
1297                } else {
1298                    filter.input.max_rows()
1299                }
1300            }
1301            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1302            LogicalPlan::Aggregate(Aggregate {
1303                input, group_expr, ..
1304            }) => {
1305                // Empty group_expr will return Some(1)
1306                if group_expr
1307                    .iter()
1308                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1309                {
1310                    Some(1)
1311                } else {
1312                    input.max_rows()
1313                }
1314            }
1315            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1316                match (fetch, input.max_rows()) {
1317                    (Some(fetch_limit), Some(input_max)) => {
1318                        Some(input_max.min(*fetch_limit))
1319                    }
1320                    (Some(fetch_limit), None) => Some(*fetch_limit),
1321                    (None, Some(input_max)) => Some(input_max),
1322                    (None, None) => None,
1323                }
1324            }
1325            LogicalPlan::Join(Join {
1326                left,
1327                right,
1328                join_type,
1329                ..
1330            }) => match join_type {
1331                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1332                JoinType::Left | JoinType::Right | JoinType::Full => {
1333                    match (left.max_rows()?, right.max_rows()?, join_type) {
1334                        (0, 0, _) => Some(0),
1335                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1336                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1337                        (left_max, right_max, _) => Some(left_max * right_max),
1338                    }
1339                }
1340                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1341                    left.max_rows()
1342                }
1343                JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
1344            },
1345            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1346            LogicalPlan::Union(Union { inputs, .. }) => {
1347                inputs.iter().try_fold(0usize, |mut acc, plan| {
1348                    acc += plan.max_rows()?;
1349                    Some(acc)
1350                })
1351            }
1352            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1353            LogicalPlan::EmptyRelation(_) => Some(0),
1354            LogicalPlan::RecursiveQuery(_) => None,
1355            LogicalPlan::Subquery(_) => None,
1356            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1357            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1358                Ok(FetchType::Literal(s)) => s,
1359                _ => None,
1360            },
1361            LogicalPlan::Distinct(
1362                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1363            ) => input.max_rows(),
1364            LogicalPlan::Values(v) => Some(v.values.len()),
1365            LogicalPlan::Unnest(_) => None,
1366            LogicalPlan::Ddl(_)
1367            | LogicalPlan::Explain(_)
1368            | LogicalPlan::Analyze(_)
1369            | LogicalPlan::Dml(_)
1370            | LogicalPlan::Copy(_)
1371            | LogicalPlan::DescribeTable(_)
1372            | LogicalPlan::Statement(_)
1373            | LogicalPlan::Extension(_) => None,
1374        }
1375    }
1376
1377    /// If this node's expressions contains any references to an outer subquery
1378    pub fn contains_outer_reference(&self) -> bool {
1379        let mut contains = false;
1380        self.apply_expressions(|expr| {
1381            Ok(if expr.contains_outer() {
1382                contains = true;
1383                TreeNodeRecursion::Stop
1384            } else {
1385                TreeNodeRecursion::Continue
1386            })
1387        })
1388        .unwrap();
1389        contains
1390    }
1391
1392    /// Get the output expressions and their corresponding columns.
1393    ///
1394    /// The parent node may reference the output columns of the plan by expressions, such as
1395    /// projection over aggregate or window functions. This method helps to convert the
1396    /// referenced expressions into columns.
1397    ///
1398    /// See also: [`crate::utils::columnize_expr`]
1399    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1400        match self {
1401            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1402                .output_expressions()?
1403                .into_iter()
1404                .zip(self.schema().columns())
1405                .collect()),
1406            LogicalPlan::Window(Window {
1407                window_expr,
1408                input,
1409                schema,
1410            }) => {
1411                // The input could be another Window, so the result should also include the input's. For Example:
1412                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1413                // Its plan is:
1414                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1415                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1416                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1417                //       TableScan: t projection=[a, b]
1418                let mut output_exprs = input.columnized_output_exprs()?;
1419                let input_len = input.schema().fields().len();
1420                output_exprs.extend(
1421                    window_expr
1422                        .iter()
1423                        .zip(schema.columns().into_iter().skip(input_len)),
1424                );
1425                Ok(output_exprs)
1426            }
1427            _ => Ok(vec![]),
1428        }
1429    }
1430}
1431
1432impl LogicalPlan {
1433    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1434    /// ...) replaced with corresponding values provided in
1435    /// `params_values`
1436    ///
1437    /// See [`Self::with_param_values`] for examples and usage with an owned
1438    /// `ParamValues`
1439    pub fn replace_params_with_values(
1440        self,
1441        param_values: &ParamValues,
1442    ) -> Result<LogicalPlan> {
1443        self.transform_up_with_subqueries(|plan| {
1444            let schema = Arc::clone(plan.schema());
1445            let name_preserver = NamePreserver::new(&plan);
1446            plan.map_expressions(|e| {
1447                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1448                if !has_placeholder {
1449                    // Performance optimization:
1450                    // avoid NamePreserver copy and second pass over expression
1451                    // if no placeholders.
1452                    Ok(Transformed::no(e))
1453                } else {
1454                    let original_name = name_preserver.save(&e);
1455                    let transformed_expr = e.transform_up(|e| {
1456                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1457                            let value = param_values.get_placeholders_with_values(&id)?;
1458                            Ok(Transformed::yes(Expr::Literal(value, None)))
1459                        } else {
1460                            Ok(Transformed::no(e))
1461                        }
1462                    })?;
1463                    // Preserve name to avoid breaking column references to this expression
1464                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1465                }
1466            })
1467        })
1468        .map(|res| res.data)
1469    }
1470
1471    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1472    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1473        let mut param_names = HashSet::new();
1474        self.apply_with_subqueries(|plan| {
1475            plan.apply_expressions(|expr| {
1476                expr.apply(|expr| {
1477                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1478                        param_names.insert(id.clone());
1479                    }
1480                    Ok(TreeNodeRecursion::Continue)
1481                })
1482            })
1483        })
1484        .map(|_| param_names)
1485    }
1486
1487    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1488    pub fn get_parameter_types(
1489        &self,
1490    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1491        let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1492
1493        self.apply_with_subqueries(|plan| {
1494            plan.apply_expressions(|expr| {
1495                expr.apply(|expr| {
1496                    if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1497                        let prev = param_types.get(id);
1498                        match (prev, data_type) {
1499                            (Some(Some(prev)), Some(dt)) => {
1500                                if prev != dt {
1501                                    plan_err!("Conflicting types for {id}")?;
1502                                }
1503                            }
1504                            (_, Some(dt)) => {
1505                                param_types.insert(id.clone(), Some(dt.clone()));
1506                            }
1507                            _ => {
1508                                param_types.insert(id.clone(), None);
1509                            }
1510                        }
1511                    }
1512                    Ok(TreeNodeRecursion::Continue)
1513                })
1514            })
1515        })
1516        .map(|_| param_types)
1517    }
1518
1519    // ------------
1520    // Various implementations for printing out LogicalPlans
1521    // ------------
1522
1523    /// Return a `format`able structure that produces a single line
1524    /// per node.
1525    ///
1526    /// # Example
1527    ///
1528    /// ```text
1529    /// Projection: employee.id
1530    ///    Filter: employee.state Eq Utf8(\"CO\")\
1531    ///       CsvScan: employee projection=Some([0, 3])
1532    /// ```
1533    ///
1534    /// ```
1535    /// use arrow::datatypes::{Field, Schema, DataType};
1536    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1537    /// let schema = Schema::new(vec![
1538    ///     Field::new("id", DataType::Int32, false),
1539    /// ]);
1540    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1541    ///     .filter(col("id").eq(lit(5))).unwrap()
1542    ///     .build().unwrap();
1543    ///
1544    /// // Format using display_indent
1545    /// let display_string = format!("{}", plan.display_indent());
1546    ///
1547    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1",
1548    ///             display_string);
1549    /// ```
1550    pub fn display_indent(&self) -> impl Display + '_ {
1551        // Boilerplate structure to wrap LogicalPlan with something
1552        // that that can be formatted
1553        struct Wrapper<'a>(&'a LogicalPlan);
1554        impl Display for Wrapper<'_> {
1555            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1556                let with_schema = false;
1557                let mut visitor = IndentVisitor::new(f, with_schema);
1558                match self.0.visit_with_subqueries(&mut visitor) {
1559                    Ok(_) => Ok(()),
1560                    Err(_) => Err(fmt::Error),
1561                }
1562            }
1563        }
1564        Wrapper(self)
1565    }
1566
1567    /// Return a `format`able structure that produces a single line
1568    /// per node that includes the output schema. For example:
1569    ///
1570    /// ```text
1571    /// Projection: employee.id [id:Int32]\
1572    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1573    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1574    /// ```
1575    ///
1576    /// ```
1577    /// use arrow::datatypes::{Field, Schema, DataType};
1578    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1579    /// let schema = Schema::new(vec![
1580    ///     Field::new("id", DataType::Int32, false),
1581    /// ]);
1582    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1583    ///     .filter(col("id").eq(lit(5))).unwrap()
1584    ///     .build().unwrap();
1585    ///
1586    /// // Format using display_indent_schema
1587    /// let display_string = format!("{}", plan.display_indent_schema());
1588    ///
1589    /// assert_eq!("Filter: t1.id = Int32(5) [id:Int32]\
1590    ///             \n  TableScan: t1 [id:Int32]",
1591    ///             display_string);
1592    /// ```
1593    pub fn display_indent_schema(&self) -> impl Display + '_ {
1594        // Boilerplate structure to wrap LogicalPlan with something
1595        // that that can be formatted
1596        struct Wrapper<'a>(&'a LogicalPlan);
1597        impl Display for Wrapper<'_> {
1598            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1599                let with_schema = true;
1600                let mut visitor = IndentVisitor::new(f, with_schema);
1601                match self.0.visit_with_subqueries(&mut visitor) {
1602                    Ok(_) => Ok(()),
1603                    Err(_) => Err(fmt::Error),
1604                }
1605            }
1606        }
1607        Wrapper(self)
1608    }
1609
1610    /// Return a displayable structure that produces plan in postgresql JSON format.
1611    ///
1612    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1613    pub fn display_pg_json(&self) -> impl Display + '_ {
1614        // Boilerplate structure to wrap LogicalPlan with something
1615        // that that can be formatted
1616        struct Wrapper<'a>(&'a LogicalPlan);
1617        impl Display for Wrapper<'_> {
1618            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1619                let mut visitor = PgJsonVisitor::new(f);
1620                visitor.with_schema(true);
1621                match self.0.visit_with_subqueries(&mut visitor) {
1622                    Ok(_) => Ok(()),
1623                    Err(_) => Err(fmt::Error),
1624                }
1625            }
1626        }
1627        Wrapper(self)
1628    }
1629
1630    /// Return a `format`able structure that produces lines meant for
1631    /// graphical display using the `DOT` language. This format can be
1632    /// visualized using software from
1633    /// [`graphviz`](https://graphviz.org/)
1634    ///
1635    /// This currently produces two graphs -- one with the basic
1636    /// structure, and one with additional details such as schema.
1637    ///
1638    /// ```
1639    /// use arrow::datatypes::{Field, Schema, DataType};
1640    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1641    /// let schema = Schema::new(vec![
1642    ///     Field::new("id", DataType::Int32, false),
1643    /// ]);
1644    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1645    ///     .filter(col("id").eq(lit(5))).unwrap()
1646    ///     .build().unwrap();
1647    ///
1648    /// // Format using display_graphviz
1649    /// let graphviz_string = format!("{}", plan.display_graphviz());
1650    /// ```
1651    ///
1652    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1653    /// commands can be used to render it as a pdf:
1654    ///
1655    /// ```bash
1656    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1657    /// ```
1658    ///
1659    pub fn display_graphviz(&self) -> impl Display + '_ {
1660        // Boilerplate structure to wrap LogicalPlan with something
1661        // that that can be formatted
1662        struct Wrapper<'a>(&'a LogicalPlan);
1663        impl Display for Wrapper<'_> {
1664            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1665                let mut visitor = GraphvizVisitor::new(f);
1666
1667                visitor.start_graph()?;
1668
1669                visitor.pre_visit_plan("LogicalPlan")?;
1670                self.0
1671                    .visit_with_subqueries(&mut visitor)
1672                    .map_err(|_| fmt::Error)?;
1673                visitor.post_visit_plan()?;
1674
1675                visitor.set_with_schema(true);
1676                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1677                self.0
1678                    .visit_with_subqueries(&mut visitor)
1679                    .map_err(|_| fmt::Error)?;
1680                visitor.post_visit_plan()?;
1681
1682                visitor.end_graph()?;
1683                Ok(())
1684            }
1685        }
1686        Wrapper(self)
1687    }
1688
1689    /// Return a `format`able structure with the a human readable
1690    /// description of this LogicalPlan node per node, not including
1691    /// children. For example:
1692    ///
1693    /// ```text
1694    /// Projection: id
1695    /// ```
1696    /// ```
1697    /// use arrow::datatypes::{Field, Schema, DataType};
1698    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1699    /// let schema = Schema::new(vec![
1700    ///     Field::new("id", DataType::Int32, false),
1701    /// ]);
1702    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1703    ///     .build().unwrap();
1704    ///
1705    /// // Format using display
1706    /// let display_string = format!("{}", plan.display());
1707    ///
1708    /// assert_eq!("TableScan: t1", display_string);
1709    /// ```
1710    pub fn display(&self) -> impl Display + '_ {
1711        // Boilerplate structure to wrap LogicalPlan with something
1712        // that that can be formatted
1713        struct Wrapper<'a>(&'a LogicalPlan);
1714        impl Display for Wrapper<'_> {
1715            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1716                match self.0 {
1717                    LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1718                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1719                        is_distinct, ..
1720                    }) => {
1721                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1722                    }
1723                    LogicalPlan::Values(Values { ref values, .. }) => {
1724                        let str_values: Vec<_> = values
1725                            .iter()
1726                            // limit to only 5 values to avoid horrible display
1727                            .take(5)
1728                            .map(|row| {
1729                                let item = row
1730                                    .iter()
1731                                    .map(|expr| expr.to_string())
1732                                    .collect::<Vec<_>>()
1733                                    .join(", ");
1734                                format!("({item})")
1735                            })
1736                            .collect();
1737
1738                        let eclipse = if values.len() > 5 { "..." } else { "" };
1739                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1740                    }
1741
1742                    LogicalPlan::TableScan(TableScan {
1743                        ref source,
1744                        ref table_name,
1745                        ref projection,
1746                        ref filters,
1747                        ref fetch,
1748                        ..
1749                    }) => {
1750                        let projected_fields = match projection {
1751                            Some(indices) => {
1752                                let schema = source.schema();
1753                                let names: Vec<&str> = indices
1754                                    .iter()
1755                                    .map(|i| schema.field(*i).name().as_str())
1756                                    .collect();
1757                                format!(" projection=[{}]", names.join(", "))
1758                            }
1759                            _ => "".to_string(),
1760                        };
1761
1762                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1763
1764                        if !filters.is_empty() {
1765                            let mut full_filter = vec![];
1766                            let mut partial_filter = vec![];
1767                            let mut unsupported_filters = vec![];
1768                            let filters: Vec<&Expr> = filters.iter().collect();
1769
1770                            if let Ok(results) =
1771                                source.supports_filters_pushdown(&filters)
1772                            {
1773                                filters.iter().zip(results.iter()).for_each(
1774                                    |(x, res)| match res {
1775                                        TableProviderFilterPushDown::Exact => {
1776                                            full_filter.push(x)
1777                                        }
1778                                        TableProviderFilterPushDown::Inexact => {
1779                                            partial_filter.push(x)
1780                                        }
1781                                        TableProviderFilterPushDown::Unsupported => {
1782                                            unsupported_filters.push(x)
1783                                        }
1784                                    },
1785                                );
1786                            }
1787
1788                            if !full_filter.is_empty() {
1789                                write!(
1790                                    f,
1791                                    ", full_filters=[{}]",
1792                                    expr_vec_fmt!(full_filter)
1793                                )?;
1794                            };
1795                            if !partial_filter.is_empty() {
1796                                write!(
1797                                    f,
1798                                    ", partial_filters=[{}]",
1799                                    expr_vec_fmt!(partial_filter)
1800                                )?;
1801                            }
1802                            if !unsupported_filters.is_empty() {
1803                                write!(
1804                                    f,
1805                                    ", unsupported_filters=[{}]",
1806                                    expr_vec_fmt!(unsupported_filters)
1807                                )?;
1808                            }
1809                        }
1810
1811                        if let Some(n) = fetch {
1812                            write!(f, ", fetch={n}")?;
1813                        }
1814
1815                        Ok(())
1816                    }
1817                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
1818                        write!(f, "Projection:")?;
1819                        for (i, expr_item) in expr.iter().enumerate() {
1820                            if i > 0 {
1821                                write!(f, ",")?;
1822                            }
1823                            write!(f, " {expr_item}")?;
1824                        }
1825                        Ok(())
1826                    }
1827                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1828                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1829                    }
1830                    LogicalPlan::Copy(CopyTo {
1831                        input: _,
1832                        output_url,
1833                        file_type,
1834                        options,
1835                        ..
1836                    }) => {
1837                        let op_str = options
1838                            .iter()
1839                            .map(|(k, v)| format!("{k} {v}"))
1840                            .collect::<Vec<String>>()
1841                            .join(", ");
1842
1843                        write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1844                    }
1845                    LogicalPlan::Ddl(ddl) => {
1846                        write!(f, "{}", ddl.display())
1847                    }
1848                    LogicalPlan::Filter(Filter {
1849                        predicate: ref expr,
1850                        ..
1851                    }) => write!(f, "Filter: {expr}"),
1852                    LogicalPlan::Window(Window {
1853                        ref window_expr, ..
1854                    }) => {
1855                        write!(
1856                            f,
1857                            "WindowAggr: windowExpr=[[{}]]",
1858                            expr_vec_fmt!(window_expr)
1859                        )
1860                    }
1861                    LogicalPlan::Aggregate(Aggregate {
1862                        ref group_expr,
1863                        ref aggr_expr,
1864                        ..
1865                    }) => write!(
1866                        f,
1867                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1868                        expr_vec_fmt!(group_expr),
1869                        expr_vec_fmt!(aggr_expr)
1870                    ),
1871                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1872                        write!(f, "Sort: ")?;
1873                        for (i, expr_item) in expr.iter().enumerate() {
1874                            if i > 0 {
1875                                write!(f, ", ")?;
1876                            }
1877                            write!(f, "{expr_item}")?;
1878                        }
1879                        if let Some(a) = fetch {
1880                            write!(f, ", fetch={a}")?;
1881                        }
1882
1883                        Ok(())
1884                    }
1885                    LogicalPlan::Join(Join {
1886                        on: ref keys,
1887                        filter,
1888                        join_constraint,
1889                        join_type,
1890                        ..
1891                    }) => {
1892                        let join_expr: Vec<String> =
1893                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1894                        let filter_expr = filter
1895                            .as_ref()
1896                            .map(|expr| format!(" Filter: {expr}"))
1897                            .unwrap_or_else(|| "".to_string());
1898                        let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1899                            "Cross".to_string()
1900                        } else {
1901                            join_type.to_string()
1902                        };
1903                        match join_constraint {
1904                            JoinConstraint::On => {
1905                                write!(
1906                                    f,
1907                                    "{} Join: {}{}",
1908                                    join_type,
1909                                    join_expr.join(", "),
1910                                    filter_expr
1911                                )
1912                            }
1913                            JoinConstraint::Using => {
1914                                write!(
1915                                    f,
1916                                    "{} Join: Using {}{}",
1917                                    join_type,
1918                                    join_expr.join(", "),
1919                                    filter_expr,
1920                                )
1921                            }
1922                        }
1923                    }
1924                    LogicalPlan::Repartition(Repartition {
1925                        partitioning_scheme,
1926                        ..
1927                    }) => match partitioning_scheme {
1928                        Partitioning::RoundRobinBatch(n) => {
1929                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1930                        }
1931                        Partitioning::Hash(expr, n) => {
1932                            let hash_expr: Vec<String> =
1933                                expr.iter().map(|e| format!("{e}")).collect();
1934                            write!(
1935                                f,
1936                                "Repartition: Hash({}) partition_count={}",
1937                                hash_expr.join(", "),
1938                                n
1939                            )
1940                        }
1941                        Partitioning::DistributeBy(expr) => {
1942                            let dist_by_expr: Vec<String> =
1943                                expr.iter().map(|e| format!("{e}")).collect();
1944                            write!(
1945                                f,
1946                                "Repartition: DistributeBy({})",
1947                                dist_by_expr.join(", "),
1948                            )
1949                        }
1950                    },
1951                    LogicalPlan::Limit(limit) => {
1952                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
1953                        let skip_str = match limit.get_skip_type() {
1954                            Ok(SkipType::Literal(n)) => n.to_string(),
1955                            _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1956                        };
1957                        let fetch_str = match limit.get_fetch_type() {
1958                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
1959                            Ok(FetchType::Literal(None)) => "None".to_string(),
1960                            _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1961                        };
1962                        write!(
1963                            f,
1964                            "Limit: skip={skip_str}, fetch={fetch_str}",
1965                        )
1966                    }
1967                    LogicalPlan::Subquery(Subquery { .. }) => {
1968                        write!(f, "Subquery:")
1969                    }
1970                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1971                        write!(f, "SubqueryAlias: {alias}")
1972                    }
1973                    LogicalPlan::Statement(statement) => {
1974                        write!(f, "{}", statement.display())
1975                    }
1976                    LogicalPlan::Distinct(distinct) => match distinct {
1977                        Distinct::All(_) => write!(f, "Distinct:"),
1978                        Distinct::On(DistinctOn {
1979                            on_expr,
1980                            select_expr,
1981                            sort_expr,
1982                            ..
1983                        }) => write!(
1984                            f,
1985                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1986                            expr_vec_fmt!(on_expr),
1987                            expr_vec_fmt!(select_expr),
1988                            if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1989                        ),
1990                    },
1991                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
1992                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1993                    LogicalPlan::Union(_) => write!(f, "Union"),
1994                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1995                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1996                        write!(f, "DescribeTable")
1997                    }
1998                    LogicalPlan::Unnest(Unnest {
1999                        input: plan,
2000                        list_type_columns: list_col_indices,
2001                        struct_type_columns: struct_col_indices, .. }) => {
2002                        let input_columns = plan.schema().columns();
2003                        let list_type_columns = list_col_indices
2004                            .iter()
2005                            .map(|(i,unnest_info)|
2006                                format!("{}|depth={}", &input_columns[*i].to_string(),
2007                                unnest_info.depth))
2008                            .collect::<Vec<String>>();
2009                        let struct_type_columns = struct_col_indices
2010                            .iter()
2011                            .map(|i| &input_columns[*i])
2012                            .collect::<Vec<&Column>>();
2013                        // get items from input_columns indexed by list_col_indices
2014                        write!(f, "Unnest: lists[{}] structs[{}]",
2015                        expr_vec_fmt!(list_type_columns),
2016                        expr_vec_fmt!(struct_type_columns))
2017                    }
2018                }
2019            }
2020        }
2021        Wrapper(self)
2022    }
2023}
2024
2025impl Display for LogicalPlan {
2026    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2027        self.display_indent().fmt(f)
2028    }
2029}
2030
2031impl ToStringifiedPlan for LogicalPlan {
2032    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2033        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2034    }
2035}
2036
2037/// Produces no rows: An empty relation with an empty schema
2038#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2039pub struct EmptyRelation {
2040    /// Whether to produce a placeholder row
2041    pub produce_one_row: bool,
2042    /// The schema description of the output
2043    pub schema: DFSchemaRef,
2044}
2045
2046// Manual implementation needed because of `schema` field. Comparison excludes this field.
2047impl PartialOrd for EmptyRelation {
2048    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2049        self.produce_one_row.partial_cmp(&other.produce_one_row)
2050    }
2051}
2052
2053/// A variadic query operation, Recursive CTE.
2054///
2055/// # Recursive Query Evaluation
2056///
2057/// From the [Postgres Docs]:
2058///
2059/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2060///    discard duplicate rows. Include all remaining rows in the result of the
2061///    recursive query, and also place them in a temporary working table.
2062///
2063/// 2. So long as the working table is not empty, repeat these steps:
2064///
2065/// * Evaluate the recursive term, substituting the current contents of the
2066///   working table for the recursive self-reference. For `UNION` (but not `UNION
2067///   ALL`), discard duplicate rows and rows that duplicate any previous result
2068///   row. Include all remaining rows in the result of the recursive query, and
2069///   also place them in a temporary intermediate table.
2070///
2071/// * Replace the contents of the working table with the contents of the
2072///   intermediate table, then empty the intermediate table.
2073///
2074/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2075#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2076pub struct RecursiveQuery {
2077    /// Name of the query
2078    pub name: String,
2079    /// The static term (initial contents of the working table)
2080    pub static_term: Arc<LogicalPlan>,
2081    /// The recursive term (evaluated on the contents of the working table until
2082    /// it returns an empty set)
2083    pub recursive_term: Arc<LogicalPlan>,
2084    /// Should the output of the recursive term be deduplicated (`UNION`) or
2085    /// not (`UNION ALL`).
2086    pub is_distinct: bool,
2087}
2088
2089/// Values expression. See
2090/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2091/// documentation for more details.
2092#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2093pub struct Values {
2094    /// The table schema
2095    pub schema: DFSchemaRef,
2096    /// Values
2097    pub values: Vec<Vec<Expr>>,
2098}
2099
2100// Manual implementation needed because of `schema` field. Comparison excludes this field.
2101impl PartialOrd for Values {
2102    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2103        self.values.partial_cmp(&other.values)
2104    }
2105}
2106
2107/// Evaluates an arbitrary list of expressions (essentially a
2108/// SELECT with an expression list) on its input.
2109#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2110// mark non_exhaustive to encourage use of try_new/new()
2111#[non_exhaustive]
2112pub struct Projection {
2113    /// The list of expressions
2114    pub expr: Vec<Expr>,
2115    /// The incoming logical plan
2116    pub input: Arc<LogicalPlan>,
2117    /// The schema description of the output
2118    pub schema: DFSchemaRef,
2119}
2120
2121// Manual implementation needed because of `schema` field. Comparison excludes this field.
2122impl PartialOrd for Projection {
2123    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2124        match self.expr.partial_cmp(&other.expr) {
2125            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2126            cmp => cmp,
2127        }
2128    }
2129}
2130
2131impl Projection {
2132    /// Create a new Projection
2133    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2134        let projection_schema = projection_schema(&input, &expr)?;
2135        Self::try_new_with_schema(expr, input, projection_schema)
2136    }
2137
2138    /// Create a new Projection using the specified output schema
2139    pub fn try_new_with_schema(
2140        expr: Vec<Expr>,
2141        input: Arc<LogicalPlan>,
2142        schema: DFSchemaRef,
2143    ) -> Result<Self> {
2144        #[expect(deprecated)]
2145        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2146            && expr.len() != schema.fields().len()
2147        {
2148            return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2149        }
2150        Ok(Self {
2151            expr,
2152            input,
2153            schema,
2154        })
2155    }
2156
2157    /// Create a new Projection using the specified output schema
2158    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2159        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2160        Self {
2161            expr,
2162            input,
2163            schema,
2164        }
2165    }
2166}
2167
2168/// Computes the schema of the result produced by applying a projection to the input logical plan.
2169///
2170/// # Arguments
2171///
2172/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2173///   will be computed.
2174/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2175///
2176/// # Returns
2177///
2178/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2179/// produced by the projection operation. If the schema computation is successful,
2180/// the `Result` will contain the schema; otherwise, it will contain an error.
2181pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2182    let metadata = input.schema().metadata().clone();
2183
2184    let schema =
2185        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2186            .with_functional_dependencies(calc_func_dependencies_for_project(
2187                exprs, input,
2188            )?)?;
2189
2190    Ok(Arc::new(schema))
2191}
2192
2193/// Aliased subquery
2194#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2195// mark non_exhaustive to encourage use of try_new/new()
2196#[non_exhaustive]
2197pub struct SubqueryAlias {
2198    /// The incoming logical plan
2199    pub input: Arc<LogicalPlan>,
2200    /// The alias for the input relation
2201    pub alias: TableReference,
2202    /// The schema with qualified field names
2203    pub schema: DFSchemaRef,
2204}
2205
2206impl SubqueryAlias {
2207    pub fn try_new(
2208        plan: Arc<LogicalPlan>,
2209        alias: impl Into<TableReference>,
2210    ) -> Result<Self> {
2211        let alias = alias.into();
2212        let fields = change_redundant_column(plan.schema().fields());
2213        let meta_data = plan.schema().as_ref().metadata().clone();
2214        let schema: Schema =
2215            DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2216        // Since schema is the same, other than qualifier, we can use existing
2217        // functional dependencies:
2218        let func_dependencies = plan.schema().functional_dependencies().clone();
2219        let schema = DFSchemaRef::new(
2220            DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2221                .with_functional_dependencies(func_dependencies)?,
2222        );
2223        Ok(SubqueryAlias {
2224            input: plan,
2225            alias,
2226            schema,
2227        })
2228    }
2229}
2230
2231// Manual implementation needed because of `schema` field. Comparison excludes this field.
2232impl PartialOrd for SubqueryAlias {
2233    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2234        match self.input.partial_cmp(&other.input) {
2235            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2236            cmp => cmp,
2237        }
2238    }
2239}
2240
2241/// Filters rows from its input that do not match an
2242/// expression (essentially a WHERE clause with a predicate
2243/// expression).
2244///
2245/// Semantically, `<predicate>` is evaluated for each row of the input;
2246/// If the value of `<predicate>` is true, the input row is passed to
2247/// the output. If the value of `<predicate>` is false, the row is
2248/// discarded.
2249///
2250/// Filter should not be created directly but instead use `try_new()`
2251/// and that these fields are only pub to support pattern matching
2252#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2253#[non_exhaustive]
2254pub struct Filter {
2255    /// The predicate expression, which must have Boolean type.
2256    pub predicate: Expr,
2257    /// The incoming logical plan
2258    pub input: Arc<LogicalPlan>,
2259}
2260
2261impl Filter {
2262    /// Create a new filter operator.
2263    ///
2264    /// Notes: as Aliases have no effect on the output of a filter operator,
2265    /// they are removed from the predicate expression.
2266    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2267        Self::try_new_internal(predicate, input)
2268    }
2269
2270    /// Create a new filter operator for a having clause.
2271    /// This is similar to a filter, but its having flag is set to true.
2272    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2273    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2274        Self::try_new_internal(predicate, input)
2275    }
2276
2277    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2278        match data_type {
2279            // Interpret NULL as a missing boolean value.
2280            DataType::Boolean | DataType::Null => true,
2281            DataType::Dictionary(_, value_type) => {
2282                Filter::is_allowed_filter_type(value_type.as_ref())
2283            }
2284            _ => false,
2285        }
2286    }
2287
2288    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2289        // Filter predicates must return a boolean value so we try and validate that here.
2290        // Note that it is not always possible to resolve the predicate expression during plan
2291        // construction (such as with correlated subqueries) so we make a best effort here and
2292        // ignore errors resolving the expression against the schema.
2293        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2294            if !Filter::is_allowed_filter_type(&predicate_type) {
2295                return plan_err!(
2296                    "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2297                );
2298            }
2299        }
2300
2301        Ok(Self {
2302            predicate: predicate.unalias_nested().data,
2303            input,
2304        })
2305    }
2306
2307    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2308    ///
2309    /// This function will return `true` if its predicate contains a conjunction of
2310    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2311    /// by this conjunction.
2312    ///
2313    /// For example, for the table:
2314    /// ```sql
2315    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2316    /// ```
2317    /// `Filter(a = 2).is_scalar() == true`
2318    /// , whereas
2319    /// `Filter(b = 2).is_scalar() == false`
2320    /// and
2321    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2322    fn is_scalar(&self) -> bool {
2323        let schema = self.input.schema();
2324
2325        let functional_dependencies = self.input.schema().functional_dependencies();
2326        let unique_keys = functional_dependencies.iter().filter(|dep| {
2327            let nullable = dep.nullable
2328                && dep
2329                    .source_indices
2330                    .iter()
2331                    .any(|&source| schema.field(source).is_nullable());
2332            !nullable
2333                && dep.mode == Dependency::Single
2334                && dep.target_indices.len() == schema.fields().len()
2335        });
2336
2337        let exprs = split_conjunction(&self.predicate);
2338        let eq_pred_cols: HashSet<_> = exprs
2339            .iter()
2340            .filter_map(|expr| {
2341                let Expr::BinaryExpr(BinaryExpr {
2342                    left,
2343                    op: Operator::Eq,
2344                    right,
2345                }) = expr
2346                else {
2347                    return None;
2348                };
2349                // This is a no-op filter expression
2350                if left == right {
2351                    return None;
2352                }
2353
2354                match (left.as_ref(), right.as_ref()) {
2355                    (Expr::Column(_), Expr::Column(_)) => None,
2356                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2357                        Some(schema.index_of_column(c).unwrap())
2358                    }
2359                    _ => None,
2360                }
2361            })
2362            .collect();
2363
2364        // If we have a functional dependence that is a subset of our predicate,
2365        // this filter is scalar
2366        for key in unique_keys {
2367            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2368                return true;
2369            }
2370        }
2371        false
2372    }
2373}
2374
2375/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2376///
2377/// # Output Schema
2378///
2379/// The output schema is the input schema followed by the window function
2380/// expressions, in order.
2381///
2382/// For example, given the input schema `"A", "B", "C"` and the window function
2383/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2384/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2385/// output column.
2386///
2387/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2388/// schema.
2389#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2390pub struct Window {
2391    /// The incoming logical plan
2392    pub input: Arc<LogicalPlan>,
2393    /// The window function expression
2394    pub window_expr: Vec<Expr>,
2395    /// The schema description of the window output
2396    pub schema: DFSchemaRef,
2397}
2398
2399impl Window {
2400    /// Create a new window operator.
2401    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2402        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2403            .schema()
2404            .iter()
2405            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2406            .collect();
2407        let input_len = fields.len();
2408        let mut window_fields = fields;
2409        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2410        window_fields.extend_from_slice(expr_fields.as_slice());
2411        let metadata = input.schema().metadata().clone();
2412
2413        // Update functional dependencies for window:
2414        let mut window_func_dependencies =
2415            input.schema().functional_dependencies().clone();
2416        window_func_dependencies.extend_target_indices(window_fields.len());
2417
2418        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2419        // of consecutive numbers per partition), we can represent this fact with
2420        // functional dependencies.
2421        let mut new_dependencies = window_expr
2422            .iter()
2423            .enumerate()
2424            .filter_map(|(idx, expr)| {
2425                let Expr::WindowFunction(window_fun) = expr else {
2426                    return None;
2427                };
2428                let WindowFunction {
2429                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2430                    params: WindowFunctionParams { partition_by, .. },
2431                } = window_fun.as_ref()
2432                else {
2433                    return None;
2434                };
2435                // When there is no PARTITION BY, row number will be unique
2436                // across the entire table.
2437                if udwf.name() == "row_number" && partition_by.is_empty() {
2438                    Some(idx + input_len)
2439                } else {
2440                    None
2441                }
2442            })
2443            .map(|idx| {
2444                FunctionalDependence::new(vec![idx], vec![], false)
2445                    .with_mode(Dependency::Single)
2446            })
2447            .collect::<Vec<_>>();
2448
2449        if !new_dependencies.is_empty() {
2450            for dependence in new_dependencies.iter_mut() {
2451                dependence.target_indices = (0..window_fields.len()).collect();
2452            }
2453            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2454            let new_deps = FunctionalDependencies::new(new_dependencies);
2455            window_func_dependencies.extend(new_deps);
2456        }
2457
2458        Self::try_new_with_schema(
2459            window_expr,
2460            input,
2461            Arc::new(
2462                DFSchema::new_with_metadata(window_fields, metadata)?
2463                    .with_functional_dependencies(window_func_dependencies)?,
2464            ),
2465        )
2466    }
2467
2468    pub fn try_new_with_schema(
2469        window_expr: Vec<Expr>,
2470        input: Arc<LogicalPlan>,
2471        schema: DFSchemaRef,
2472    ) -> Result<Self> {
2473        if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2474            return plan_err!(
2475                "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2476                window_expr.len(),
2477                schema.fields().len() - input.schema().fields().len()
2478            );
2479        }
2480
2481        Ok(Window {
2482            input,
2483            window_expr,
2484            schema,
2485        })
2486    }
2487}
2488
2489// Manual implementation needed because of `schema` field. Comparison excludes this field.
2490impl PartialOrd for Window {
2491    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2492        match self.input.partial_cmp(&other.input) {
2493            Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2494            cmp => cmp,
2495        }
2496    }
2497}
2498
2499/// Produces rows from a table provider by reference or from the context
2500#[derive(Clone)]
2501pub struct TableScan {
2502    /// The name of the table
2503    pub table_name: TableReference,
2504    /// The source of the table
2505    pub source: Arc<dyn TableSource>,
2506    /// Optional column indices to use as a projection
2507    pub projection: Option<Vec<usize>>,
2508    /// The schema description of the output
2509    pub projected_schema: DFSchemaRef,
2510    /// Optional expressions to be used as filters by the table provider
2511    pub filters: Vec<Expr>,
2512    /// Optional number of rows to read
2513    pub fetch: Option<usize>,
2514}
2515
2516impl Debug for TableScan {
2517    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2518        f.debug_struct("TableScan")
2519            .field("table_name", &self.table_name)
2520            .field("source", &"...")
2521            .field("projection", &self.projection)
2522            .field("projected_schema", &self.projected_schema)
2523            .field("filters", &self.filters)
2524            .field("fetch", &self.fetch)
2525            .finish_non_exhaustive()
2526    }
2527}
2528
2529impl PartialEq for TableScan {
2530    fn eq(&self, other: &Self) -> bool {
2531        self.table_name == other.table_name
2532            && self.projection == other.projection
2533            && self.projected_schema == other.projected_schema
2534            && self.filters == other.filters
2535            && self.fetch == other.fetch
2536    }
2537}
2538
2539impl Eq for TableScan {}
2540
2541// Manual implementation needed because of `source` and `projected_schema` fields.
2542// Comparison excludes these field.
2543impl PartialOrd for TableScan {
2544    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2545        #[derive(PartialEq, PartialOrd)]
2546        struct ComparableTableScan<'a> {
2547            /// The name of the table
2548            pub table_name: &'a TableReference,
2549            /// Optional column indices to use as a projection
2550            pub projection: &'a Option<Vec<usize>>,
2551            /// Optional expressions to be used as filters by the table provider
2552            pub filters: &'a Vec<Expr>,
2553            /// Optional number of rows to read
2554            pub fetch: &'a Option<usize>,
2555        }
2556        let comparable_self = ComparableTableScan {
2557            table_name: &self.table_name,
2558            projection: &self.projection,
2559            filters: &self.filters,
2560            fetch: &self.fetch,
2561        };
2562        let comparable_other = ComparableTableScan {
2563            table_name: &other.table_name,
2564            projection: &other.projection,
2565            filters: &other.filters,
2566            fetch: &other.fetch,
2567        };
2568        comparable_self.partial_cmp(&comparable_other)
2569    }
2570}
2571
2572impl Hash for TableScan {
2573    fn hash<H: Hasher>(&self, state: &mut H) {
2574        self.table_name.hash(state);
2575        self.projection.hash(state);
2576        self.projected_schema.hash(state);
2577        self.filters.hash(state);
2578        self.fetch.hash(state);
2579    }
2580}
2581
2582impl TableScan {
2583    /// Initialize TableScan with appropriate schema from the given
2584    /// arguments.
2585    pub fn try_new(
2586        table_name: impl Into<TableReference>,
2587        table_source: Arc<dyn TableSource>,
2588        projection: Option<Vec<usize>>,
2589        filters: Vec<Expr>,
2590        fetch: Option<usize>,
2591    ) -> Result<Self> {
2592        let table_name = table_name.into();
2593
2594        if table_name.table().is_empty() {
2595            return plan_err!("table_name cannot be empty");
2596        }
2597        let schema = table_source.schema();
2598        let func_dependencies = FunctionalDependencies::new_from_constraints(
2599            table_source.constraints(),
2600            schema.fields.len(),
2601        );
2602        let projected_schema = projection
2603            .as_ref()
2604            .map(|p| {
2605                let projected_func_dependencies =
2606                    func_dependencies.project_functional_dependencies(p, p.len());
2607
2608                let df_schema = DFSchema::new_with_metadata(
2609                    p.iter()
2610                        .map(|i| {
2611                            (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2612                        })
2613                        .collect(),
2614                    schema.metadata.clone(),
2615                )?;
2616                df_schema.with_functional_dependencies(projected_func_dependencies)
2617            })
2618            .unwrap_or_else(|| {
2619                let df_schema =
2620                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2621                df_schema.with_functional_dependencies(func_dependencies)
2622            })?;
2623        let projected_schema = Arc::new(projected_schema);
2624
2625        Ok(Self {
2626            table_name,
2627            source: table_source,
2628            projection,
2629            projected_schema,
2630            filters,
2631            fetch,
2632        })
2633    }
2634}
2635
2636// Repartition the plan based on a partitioning scheme.
2637#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2638pub struct Repartition {
2639    /// The incoming logical plan
2640    pub input: Arc<LogicalPlan>,
2641    /// The partitioning scheme
2642    pub partitioning_scheme: Partitioning,
2643}
2644
2645/// Union multiple inputs
2646#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2647pub struct Union {
2648    /// Inputs to merge
2649    pub inputs: Vec<Arc<LogicalPlan>>,
2650    /// Union schema. Should be the same for all inputs.
2651    pub schema: DFSchemaRef,
2652}
2653
2654impl Union {
2655    /// Constructs new Union instance deriving schema from inputs.
2656    fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2657        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2658        Ok(Union { inputs, schema })
2659    }
2660
2661    /// Constructs new Union instance deriving schema from inputs.
2662    /// Inputs do not have to have matching types and produced schema will
2663    /// take type from the first input.
2664    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2665    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2666        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2667        Ok(Union { inputs, schema })
2668    }
2669
2670    /// Constructs a new Union instance that combines rows from different tables by name,
2671    /// instead of by position. This means that the specified inputs need not have schemas
2672    /// that are all the same width.
2673    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2674        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2675        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2676
2677        Ok(Union { inputs, schema })
2678    }
2679
2680    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2681    /// in an additional `Projection` to account for absence of columns
2682    /// in input schemas or differing projection orders.
2683    fn rewrite_inputs_from_schema(
2684        schema: &Arc<DFSchema>,
2685        inputs: Vec<Arc<LogicalPlan>>,
2686    ) -> Result<Vec<Arc<LogicalPlan>>> {
2687        let schema_width = schema.iter().count();
2688        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2689        for input in inputs {
2690            // Any columns that exist within the derived schema but do not exist
2691            // within an input's schema should be replaced with `NULL` aliased
2692            // to the appropriate column in the derived schema.
2693            let mut expr = Vec::with_capacity(schema_width);
2694            for column in schema.columns() {
2695                if input
2696                    .schema()
2697                    .has_column_with_unqualified_name(column.name())
2698                {
2699                    expr.push(Expr::Column(column));
2700                } else {
2701                    expr.push(
2702                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2703                    );
2704                }
2705            }
2706            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2707                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2708            )));
2709        }
2710
2711        Ok(wrapped_inputs)
2712    }
2713
2714    /// Constructs new Union instance deriving schema from inputs.
2715    ///
2716    /// If `loose_types` is true, inputs do not need to have matching types and
2717    /// the produced schema will use the type from the first input.
2718    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2719    ///
2720    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2721    /// the constructed schema follows `UNION BY NAME` semantics.
2722    fn derive_schema_from_inputs(
2723        inputs: &[Arc<LogicalPlan>],
2724        loose_types: bool,
2725        by_name: bool,
2726    ) -> Result<DFSchemaRef> {
2727        if inputs.len() < 2 {
2728            return plan_err!("UNION requires at least two inputs");
2729        }
2730
2731        if by_name {
2732            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2733        } else {
2734            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2735        }
2736    }
2737
2738    fn derive_schema_from_inputs_by_name(
2739        inputs: &[Arc<LogicalPlan>],
2740        loose_types: bool,
2741    ) -> Result<DFSchemaRef> {
2742        type FieldData<'a> =
2743            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2744        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2745        for input in inputs.iter() {
2746            for field in input.schema().fields() {
2747                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2748                    cols.iter_mut().find(|(name, _)| name == field.name())
2749                {
2750                    if !loose_types && *data_type != field.data_type() {
2751                        return plan_err!(
2752                            "Found different types for field {}",
2753                            field.name()
2754                        );
2755                    }
2756
2757                    metadata.push(field.metadata());
2758                    // If the field is nullable in any one of the inputs,
2759                    // then the field in the final schema is also nullable.
2760                    *is_nullable |= field.is_nullable();
2761                    *occurrences += 1;
2762                } else {
2763                    cols.push((
2764                        field.name(),
2765                        (
2766                            field.data_type(),
2767                            field.is_nullable(),
2768                            vec![field.metadata()],
2769                            1,
2770                        ),
2771                    ));
2772                }
2773            }
2774        }
2775
2776        let union_fields = cols
2777            .into_iter()
2778            .map(
2779                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2780                    // If the final number of occurrences of the field is less
2781                    // than the number of inputs (i.e. the field is missing from
2782                    // one or more inputs), then it must be treated as nullable.
2783                    let final_is_nullable = if occurrences == inputs.len() {
2784                        is_nullable
2785                    } else {
2786                        true
2787                    };
2788
2789                    let mut field =
2790                        Field::new(name, data_type.clone(), final_is_nullable);
2791                    field.set_metadata(intersect_maps(unmerged_metadata));
2792
2793                    (None, Arc::new(field))
2794                },
2795            )
2796            .collect::<Vec<(Option<TableReference>, _)>>();
2797
2798        let union_schema_metadata =
2799            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2800
2801        // Functional Dependencies are not preserved after UNION operation
2802        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2803        let schema = Arc::new(schema);
2804
2805        Ok(schema)
2806    }
2807
2808    fn derive_schema_from_inputs_by_position(
2809        inputs: &[Arc<LogicalPlan>],
2810        loose_types: bool,
2811    ) -> Result<DFSchemaRef> {
2812        let first_schema = inputs[0].schema();
2813        let fields_count = first_schema.fields().len();
2814        for input in inputs.iter().skip(1) {
2815            if fields_count != input.schema().fields().len() {
2816                return plan_err!(
2817                    "UNION queries have different number of columns: \
2818                    left has {} columns whereas right has {} columns",
2819                    fields_count,
2820                    input.schema().fields().len()
2821                );
2822            }
2823        }
2824
2825        let mut name_counts: HashMap<String, usize> = HashMap::new();
2826        let union_fields = (0..fields_count)
2827            .map(|i| {
2828                let fields = inputs
2829                    .iter()
2830                    .map(|input| input.schema().field(i))
2831                    .collect::<Vec<_>>();
2832                let first_field = fields[0];
2833                let base_name = first_field.name().to_string();
2834
2835                let data_type = if loose_types {
2836                    // TODO apply type coercion here, or document why it's better to defer
2837                    // temporarily use the data type from the left input and later rely on the analyzer to
2838                    // coerce the two schemas into a common one.
2839                    first_field.data_type()
2840                } else {
2841                    fields.iter().skip(1).try_fold(
2842                        first_field.data_type(),
2843                        |acc, field| {
2844                            if acc != field.data_type() {
2845                                return plan_err!(
2846                                    "UNION field {i} have different type in inputs: \
2847                                    left has {} whereas right has {}",
2848                                    first_field.data_type(),
2849                                    field.data_type()
2850                                );
2851                            }
2852                            Ok(acc)
2853                        },
2854                    )?
2855                };
2856                let nullable = fields.iter().any(|field| field.is_nullable());
2857
2858                // Generate unique field name
2859                let name = if let Some(count) = name_counts.get_mut(&base_name) {
2860                    *count += 1;
2861                    format!("{base_name}_{count}")
2862                } else {
2863                    name_counts.insert(base_name.clone(), 0);
2864                    base_name
2865                };
2866
2867                let mut field = Field::new(&name, data_type.clone(), nullable);
2868                let field_metadata =
2869                    intersect_maps(fields.iter().map(|field| field.metadata()));
2870                field.set_metadata(field_metadata);
2871                Ok((None, Arc::new(field)))
2872            })
2873            .collect::<Result<_>>()?;
2874        let union_schema_metadata =
2875            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2876
2877        // Functional Dependencies are not preserved after UNION operation
2878        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2879        let schema = Arc::new(schema);
2880
2881        Ok(schema)
2882    }
2883}
2884
2885fn intersect_maps<'a>(
2886    inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2887) -> HashMap<String, String> {
2888    let mut inputs = inputs.into_iter();
2889    let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2890    for input in inputs {
2891        // The extra dereference below (`&*v`) is a workaround for https://github.com/rkyv/rkyv/issues/434.
2892        // When this crate is used in a workspace that enables the `rkyv-64` feature in the `chrono` crate,
2893        // this triggers a Rust compilation error:
2894        // error[E0277]: can't compare `Option<&std::string::String>` with `Option<&mut std::string::String>`.
2895        merged.retain(|k, v| input.get(k) == Some(&*v));
2896    }
2897    merged
2898}
2899
2900// Manual implementation needed because of `schema` field. Comparison excludes this field.
2901impl PartialOrd for Union {
2902    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2903        self.inputs.partial_cmp(&other.inputs)
2904    }
2905}
2906
2907/// Describe the schema of table
2908///
2909/// # Example output:
2910///
2911/// ```sql
2912/// > describe traces;
2913/// +--------------------+-----------------------------+-------------+
2914/// | column_name        | data_type                   | is_nullable |
2915/// +--------------------+-----------------------------+-------------+
2916/// | attributes         | Utf8                        | YES         |
2917/// | duration_nano      | Int64                       | YES         |
2918/// | end_time_unix_nano | Int64                       | YES         |
2919/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
2920/// | span.kind          | Utf8                        | YES         |
2921/// | span.name          | Utf8                        | YES         |
2922/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
2923/// | time               | Timestamp(Nanosecond, None) | NO          |
2924/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
2925/// | otel.status_code   | Utf8                        | YES         |
2926/// | parent_span_id     | Utf8                        | YES         |
2927/// +--------------------+-----------------------------+-------------+
2928/// ```
2929#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2930pub struct DescribeTable {
2931    /// Table schema
2932    pub schema: Arc<Schema>,
2933    /// schema of describe table output
2934    pub output_schema: DFSchemaRef,
2935}
2936
2937// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
2938// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
2939impl PartialOrd for DescribeTable {
2940    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2941        // There is no relevant comparison for schemas
2942        None
2943    }
2944}
2945
2946/// Output formats for controlling for Explain plans
2947#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2948pub enum ExplainFormat {
2949    /// Indent mode
2950    ///
2951    /// Example:
2952    /// ```text
2953    /// > explain format indent select x from values (1) t(x);
2954    /// +---------------+-----------------------------------------------------+
2955    /// | plan_type     | plan                                                |
2956    /// +---------------+-----------------------------------------------------+
2957    /// | logical_plan  | SubqueryAlias: t                                    |
2958    /// |               |   Projection: column1 AS x                          |
2959    /// |               |     Values: (Int64(1))                              |
2960    /// | physical_plan | ProjectionExec: expr=[column1@0 as x]               |
2961    /// |               |   DataSourceExec: partitions=1, partition_sizes=[1] |
2962    /// |               |                                                     |
2963    /// +---------------+-----------------------------------------------------+
2964    /// ```
2965    Indent,
2966    /// Tree mode
2967    ///
2968    /// Example:
2969    /// ```text
2970    /// > explain format tree select x from values (1) t(x);
2971    /// +---------------+-------------------------------+
2972    /// | plan_type     | plan                          |
2973    /// +---------------+-------------------------------+
2974    /// | physical_plan | ┌───────────────────────────┐ |
2975    /// |               | │       ProjectionExec      │ |
2976    /// |               | │    --------------------   │ |
2977    /// |               | │        x: column1@0       │ |
2978    /// |               | └─────────────┬─────────────┘ |
2979    /// |               | ┌─────────────┴─────────────┐ |
2980    /// |               | │       DataSourceExec      │ |
2981    /// |               | │    --------------------   │ |
2982    /// |               | │         bytes: 128        │ |
2983    /// |               | │       format: memory      │ |
2984    /// |               | │          rows: 1          │ |
2985    /// |               | └───────────────────────────┘ |
2986    /// |               |                               |
2987    /// +---------------+-------------------------------+
2988    /// ```
2989    Tree,
2990    /// Postgres Json mode
2991    ///
2992    /// A displayable structure that produces plan in postgresql JSON format.
2993    ///
2994    /// Users can use this format to visualize the plan in existing plan
2995    /// visualization tools, for example [dalibo](https://explain.dalibo.com/)
2996    ///
2997    /// Example:
2998    /// ```text
2999    /// > explain format pgjson select x from values (1) t(x);
3000    /// +--------------+--------------------------------------+
3001    /// | plan_type    | plan                                 |
3002    /// +--------------+--------------------------------------+
3003    /// | logical_plan | [                                    |
3004    /// |              |   {                                  |
3005    /// |              |     "Plan": {                        |
3006    /// |              |       "Alias": "t",                  |
3007    /// |              |       "Node Type": "Subquery",       |
3008    /// |              |       "Output": [                    |
3009    /// |              |         "x"                          |
3010    /// |              |       ],                             |
3011    /// |              |       "Plans": [                     |
3012    /// |              |         {                            |
3013    /// |              |           "Expressions": [           |
3014    /// |              |             "column1 AS x"           |
3015    /// |              |           ],                         |
3016    /// |              |           "Node Type": "Projection", |
3017    /// |              |           "Output": [                |
3018    /// |              |             "x"                      |
3019    /// |              |           ],                         |
3020    /// |              |           "Plans": [                 |
3021    /// |              |             {                        |
3022    /// |              |               "Node Type": "Values", |
3023    /// |              |               "Output": [            |
3024    /// |              |                 "column1"            |
3025    /// |              |               ],                     |
3026    /// |              |               "Plans": [],           |
3027    /// |              |               "Values": "(Int64(1))" |
3028    /// |              |             }                        |
3029    /// |              |           ]                          |
3030    /// |              |         }                            |
3031    /// |              |       ]                              |
3032    /// |              |     }                                |
3033    /// |              |   }                                  |
3034    /// |              | ]                                    |
3035    /// +--------------+--------------------------------------+
3036    /// ```
3037    PostgresJSON,
3038    /// Graphviz mode
3039    ///
3040    /// Example:
3041    /// ```text
3042    /// > explain format graphviz select x from values (1) t(x);
3043    /// +--------------+------------------------------------------------------------------------+
3044    /// | plan_type    | plan                                                                   |
3045    /// +--------------+------------------------------------------------------------------------+
3046    /// | logical_plan |                                                                        |
3047    /// |              | // Begin DataFusion GraphViz Plan,                                     |
3048    /// |              | // display it online here: https://dreampuf.github.io/GraphvizOnline   |
3049    /// |              |                                                                        |
3050    /// |              | digraph {                                                              |
3051    /// |              |   subgraph cluster_1                                                   |
3052    /// |              |   {                                                                    |
3053    /// |              |     graph[label="LogicalPlan"]                                         |
3054    /// |              |     2[shape=box label="SubqueryAlias: t"]                              |
3055    /// |              |     3[shape=box label="Projection: column1 AS x"]                      |
3056    /// |              |     2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]                |
3057    /// |              |     4[shape=box label="Values: (Int64(1))"]                            |
3058    /// |              |     3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]                |
3059    /// |              |   }                                                                    |
3060    /// |              |   subgraph cluster_5                                                   |
3061    /// |              |   {                                                                    |
3062    /// |              |     graph[label="Detailed LogicalPlan"]                                |
3063    /// |              |     6[shape=box label="SubqueryAlias: t\nSchema: [x:Int64;N]"]         |
3064    /// |              |     7[shape=box label="Projection: column1 AS x\nSchema: [x:Int64;N]"] |
3065    /// |              |     6 -> 7 [arrowhead=none, arrowtail=normal, dir=back]                |
3066    /// |              |     8[shape=box label="Values: (Int64(1))\nSchema: [column1:Int64;N]"] |
3067    /// |              |     7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]                |
3068    /// |              |   }                                                                    |
3069    /// |              | }                                                                      |
3070    /// |              | // End DataFusion GraphViz Plan                                        |
3071    /// |              |                                                                        |
3072    /// +--------------+------------------------------------------------------------------------+
3073    /// ```
3074    Graphviz,
3075}
3076
3077/// Implement  parsing strings to `ExplainFormat`
3078impl FromStr for ExplainFormat {
3079    type Err = DataFusionError;
3080
3081    fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3082        match format.to_lowercase().as_str() {
3083            "indent" => Ok(ExplainFormat::Indent),
3084            "tree" => Ok(ExplainFormat::Tree),
3085            "pgjson" => Ok(ExplainFormat::PostgresJSON),
3086            "graphviz" => Ok(ExplainFormat::Graphviz),
3087            _ => {
3088                plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3089            }
3090        }
3091    }
3092}
3093
3094/// Produces a relation with string representations of
3095/// various parts of the plan
3096///
3097/// See [the documentation] for more information
3098///
3099/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3100#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3101pub struct Explain {
3102    /// Should extra (detailed, intermediate plans) be included?
3103    pub verbose: bool,
3104    /// Output format for explain, if specified.
3105    /// If none, defaults to `text`
3106    pub explain_format: ExplainFormat,
3107    /// The logical plan that is being EXPLAIN'd
3108    pub plan: Arc<LogicalPlan>,
3109    /// Represent the various stages plans have gone through
3110    pub stringified_plans: Vec<StringifiedPlan>,
3111    /// The output schema of the explain (2 columns of text)
3112    pub schema: DFSchemaRef,
3113    /// Used by physical planner to check if should proceed with planning
3114    pub logical_optimization_succeeded: bool,
3115}
3116
3117// Manual implementation needed because of `schema` field. Comparison excludes this field.
3118impl PartialOrd for Explain {
3119    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3120        #[derive(PartialEq, PartialOrd)]
3121        struct ComparableExplain<'a> {
3122            /// Should extra (detailed, intermediate plans) be included?
3123            pub verbose: &'a bool,
3124            /// The logical plan that is being EXPLAIN'd
3125            pub plan: &'a Arc<LogicalPlan>,
3126            /// Represent the various stages plans have gone through
3127            pub stringified_plans: &'a Vec<StringifiedPlan>,
3128            /// Used by physical planner to check if should proceed with planning
3129            pub logical_optimization_succeeded: &'a bool,
3130        }
3131        let comparable_self = ComparableExplain {
3132            verbose: &self.verbose,
3133            plan: &self.plan,
3134            stringified_plans: &self.stringified_plans,
3135            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3136        };
3137        let comparable_other = ComparableExplain {
3138            verbose: &other.verbose,
3139            plan: &other.plan,
3140            stringified_plans: &other.stringified_plans,
3141            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3142        };
3143        comparable_self.partial_cmp(&comparable_other)
3144    }
3145}
3146
3147/// Runs the actual plan, and then prints the physical plan with
3148/// with execution metrics.
3149#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3150pub struct Analyze {
3151    /// Should extra detail be included?
3152    pub verbose: bool,
3153    /// The logical plan that is being EXPLAIN ANALYZE'd
3154    pub input: Arc<LogicalPlan>,
3155    /// The output schema of the explain (2 columns of text)
3156    pub schema: DFSchemaRef,
3157}
3158
3159// Manual implementation needed because of `schema` field. Comparison excludes this field.
3160impl PartialOrd for Analyze {
3161    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3162        match self.verbose.partial_cmp(&other.verbose) {
3163            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3164            cmp => cmp,
3165        }
3166    }
3167}
3168
3169/// Extension operator defined outside of DataFusion
3170// TODO(clippy): This clippy `allow` should be removed if
3171// the manual `PartialEq` is removed in favor of a derive.
3172// (see `PartialEq` the impl for details.)
3173#[allow(clippy::derived_hash_with_manual_eq)]
3174#[derive(Debug, Clone, Eq, Hash)]
3175pub struct Extension {
3176    /// The runtime extension operator
3177    pub node: Arc<dyn UserDefinedLogicalNode>,
3178}
3179
3180// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3181// This manual implementation should be removed if
3182// https://github.com/rust-lang/rust/issues/39128 is fixed.
3183impl PartialEq for Extension {
3184    fn eq(&self, other: &Self) -> bool {
3185        self.node.eq(&other.node)
3186    }
3187}
3188
3189impl PartialOrd for Extension {
3190    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3191        self.node.partial_cmp(&other.node)
3192    }
3193}
3194
3195/// Produces the first `n` tuples from its input and discards the rest.
3196#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3197pub struct Limit {
3198    /// Number of rows to skip before fetch
3199    pub skip: Option<Box<Expr>>,
3200    /// Maximum number of rows to fetch,
3201    /// None means fetching all rows
3202    pub fetch: Option<Box<Expr>>,
3203    /// The logical plan
3204    pub input: Arc<LogicalPlan>,
3205}
3206
3207/// Different types of skip expression in Limit plan.
3208pub enum SkipType {
3209    /// The skip expression is a literal value.
3210    Literal(usize),
3211    /// Currently only supports expressions that can be folded into constants.
3212    UnsupportedExpr,
3213}
3214
3215/// Different types of fetch expression in Limit plan.
3216pub enum FetchType {
3217    /// The fetch expression is a literal value.
3218    /// `Literal(None)` means the fetch expression is not provided.
3219    Literal(Option<usize>),
3220    /// Currently only supports expressions that can be folded into constants.
3221    UnsupportedExpr,
3222}
3223
3224impl Limit {
3225    /// Get the skip type from the limit plan.
3226    pub fn get_skip_type(&self) -> Result<SkipType> {
3227        match self.skip.as_deref() {
3228            Some(expr) => match *expr {
3229                Expr::Literal(ScalarValue::Int64(s), _) => {
3230                    // `skip = NULL` is equivalent to `skip = 0`
3231                    let s = s.unwrap_or(0);
3232                    if s >= 0 {
3233                        Ok(SkipType::Literal(s as usize))
3234                    } else {
3235                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3236                    }
3237                }
3238                _ => Ok(SkipType::UnsupportedExpr),
3239            },
3240            // `skip = None` is equivalent to `skip = 0`
3241            None => Ok(SkipType::Literal(0)),
3242        }
3243    }
3244
3245    /// Get the fetch type from the limit plan.
3246    pub fn get_fetch_type(&self) -> Result<FetchType> {
3247        match self.fetch.as_deref() {
3248            Some(expr) => match *expr {
3249                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3250                    if s >= 0 {
3251                        Ok(FetchType::Literal(Some(s as usize)))
3252                    } else {
3253                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3254                    }
3255                }
3256                Expr::Literal(ScalarValue::Int64(None), _) => {
3257                    Ok(FetchType::Literal(None))
3258                }
3259                _ => Ok(FetchType::UnsupportedExpr),
3260            },
3261            None => Ok(FetchType::Literal(None)),
3262        }
3263    }
3264}
3265
3266/// Removes duplicate rows from the input
3267#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3268pub enum Distinct {
3269    /// Plain `DISTINCT` referencing all selection expressions
3270    All(Arc<LogicalPlan>),
3271    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3272    On(DistinctOn),
3273}
3274
3275impl Distinct {
3276    /// return a reference to the nodes input
3277    pub fn input(&self) -> &Arc<LogicalPlan> {
3278        match self {
3279            Distinct::All(input) => input,
3280            Distinct::On(DistinctOn { input, .. }) => input,
3281        }
3282    }
3283}
3284
3285/// Removes duplicate rows from the input
3286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3287pub struct DistinctOn {
3288    /// The `DISTINCT ON` clause expression list
3289    pub on_expr: Vec<Expr>,
3290    /// The selected projection expression list
3291    pub select_expr: Vec<Expr>,
3292    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3293    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3294    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3295    pub sort_expr: Option<Vec<SortExpr>>,
3296    /// The logical plan that is being DISTINCT'd
3297    pub input: Arc<LogicalPlan>,
3298    /// The schema description of the DISTINCT ON output
3299    pub schema: DFSchemaRef,
3300}
3301
3302impl DistinctOn {
3303    /// Create a new `DistinctOn` struct.
3304    pub fn try_new(
3305        on_expr: Vec<Expr>,
3306        select_expr: Vec<Expr>,
3307        sort_expr: Option<Vec<SortExpr>>,
3308        input: Arc<LogicalPlan>,
3309    ) -> Result<Self> {
3310        if on_expr.is_empty() {
3311            return plan_err!("No `ON` expressions provided");
3312        }
3313
3314        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3315        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3316            .into_iter()
3317            .collect();
3318
3319        let dfschema = DFSchema::new_with_metadata(
3320            qualified_fields,
3321            input.schema().metadata().clone(),
3322        )?;
3323
3324        let mut distinct_on = DistinctOn {
3325            on_expr,
3326            select_expr,
3327            sort_expr: None,
3328            input,
3329            schema: Arc::new(dfschema),
3330        };
3331
3332        if let Some(sort_expr) = sort_expr {
3333            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3334        }
3335
3336        Ok(distinct_on)
3337    }
3338
3339    /// Try to update `self` with a new sort expressions.
3340    ///
3341    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3342    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3343        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3344
3345        // Check that the left-most sort expressions are the same as the `ON` expressions.
3346        let mut matched = true;
3347        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3348            if on != &sort.expr {
3349                matched = false;
3350                break;
3351            }
3352        }
3353
3354        if self.on_expr.len() > sort_expr.len() || !matched {
3355            return plan_err!(
3356                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3357            );
3358        }
3359
3360        self.sort_expr = Some(sort_expr);
3361        Ok(self)
3362    }
3363}
3364
3365// Manual implementation needed because of `schema` field. Comparison excludes this field.
3366impl PartialOrd for DistinctOn {
3367    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3368        #[derive(PartialEq, PartialOrd)]
3369        struct ComparableDistinctOn<'a> {
3370            /// The `DISTINCT ON` clause expression list
3371            pub on_expr: &'a Vec<Expr>,
3372            /// The selected projection expression list
3373            pub select_expr: &'a Vec<Expr>,
3374            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3375            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3376            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3377            pub sort_expr: &'a Option<Vec<SortExpr>>,
3378            /// The logical plan that is being DISTINCT'd
3379            pub input: &'a Arc<LogicalPlan>,
3380        }
3381        let comparable_self = ComparableDistinctOn {
3382            on_expr: &self.on_expr,
3383            select_expr: &self.select_expr,
3384            sort_expr: &self.sort_expr,
3385            input: &self.input,
3386        };
3387        let comparable_other = ComparableDistinctOn {
3388            on_expr: &other.on_expr,
3389            select_expr: &other.select_expr,
3390            sort_expr: &other.sort_expr,
3391            input: &other.input,
3392        };
3393        comparable_self.partial_cmp(&comparable_other)
3394    }
3395}
3396
3397/// Aggregates its input based on a set of grouping and aggregate
3398/// expressions (e.g. SUM).
3399///
3400/// # Output Schema
3401///
3402/// The output schema is the group expressions followed by the aggregate
3403/// expressions in order.
3404///
3405/// For example, given the input schema `"A", "B", "C"` and the aggregate
3406/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3407/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3408/// single new column
3409#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3410// mark non_exhaustive to encourage use of try_new/new()
3411#[non_exhaustive]
3412pub struct Aggregate {
3413    /// The incoming logical plan
3414    pub input: Arc<LogicalPlan>,
3415    /// Grouping expressions
3416    pub group_expr: Vec<Expr>,
3417    /// Aggregate expressions
3418    pub aggr_expr: Vec<Expr>,
3419    /// The schema description of the aggregate output
3420    pub schema: DFSchemaRef,
3421}
3422
3423impl Aggregate {
3424    /// Create a new aggregate operator.
3425    pub fn try_new(
3426        input: Arc<LogicalPlan>,
3427        group_expr: Vec<Expr>,
3428        aggr_expr: Vec<Expr>,
3429    ) -> Result<Self> {
3430        let group_expr = enumerate_grouping_sets(group_expr)?;
3431
3432        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3433
3434        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3435
3436        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3437
3438        // Even columns that cannot be null will become nullable when used in a grouping set.
3439        if is_grouping_set {
3440            qualified_fields = qualified_fields
3441                .into_iter()
3442                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3443                .collect::<Vec<_>>();
3444            qualified_fields.push((
3445                None,
3446                Field::new(
3447                    Self::INTERNAL_GROUPING_ID,
3448                    Self::grouping_id_type(qualified_fields.len()),
3449                    false,
3450                )
3451                .into(),
3452            ));
3453        }
3454
3455        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3456
3457        let schema = DFSchema::new_with_metadata(
3458            qualified_fields,
3459            input.schema().metadata().clone(),
3460        )?;
3461
3462        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3463    }
3464
3465    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3466    /// building the schema again when the schema is already known.
3467    ///
3468    /// This method should only be called when you are absolutely sure that the schema being
3469    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3470    pub fn try_new_with_schema(
3471        input: Arc<LogicalPlan>,
3472        group_expr: Vec<Expr>,
3473        aggr_expr: Vec<Expr>,
3474        schema: DFSchemaRef,
3475    ) -> Result<Self> {
3476        if group_expr.is_empty() && aggr_expr.is_empty() {
3477            return plan_err!(
3478                "Aggregate requires at least one grouping or aggregate expression"
3479            );
3480        }
3481        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3482        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3483            return plan_err!(
3484                "Aggregate schema has wrong number of fields. Expected {} got {}",
3485                group_expr_count + aggr_expr.len(),
3486                schema.fields().len()
3487            );
3488        }
3489
3490        let aggregate_func_dependencies =
3491            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3492        let new_schema = schema.as_ref().clone();
3493        let schema = Arc::new(
3494            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3495        );
3496        Ok(Self {
3497            input,
3498            group_expr,
3499            aggr_expr,
3500            schema,
3501        })
3502    }
3503
3504    fn is_grouping_set(&self) -> bool {
3505        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3506    }
3507
3508    /// Get the output expressions.
3509    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3510        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3511            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3512        });
3513        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3514        if self.is_grouping_set() {
3515            exprs.push(&INTERNAL_ID_EXPR);
3516        }
3517        exprs.extend(self.aggr_expr.iter());
3518        debug_assert!(exprs.len() == self.schema.fields().len());
3519        Ok(exprs)
3520    }
3521
3522    /// Get the length of the group by expression in the output schema
3523    /// This is not simply group by expression length. Expression may be
3524    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3525    pub fn group_expr_len(&self) -> Result<usize> {
3526        grouping_set_expr_count(&self.group_expr)
3527    }
3528
3529    /// Returns the data type of the grouping id.
3530    /// The grouping ID value is a bitmask where each set bit
3531    /// indicates that the corresponding grouping expression is
3532    /// null
3533    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3534        if group_exprs <= 8 {
3535            DataType::UInt8
3536        } else if group_exprs <= 16 {
3537            DataType::UInt16
3538        } else if group_exprs <= 32 {
3539            DataType::UInt32
3540        } else {
3541            DataType::UInt64
3542        }
3543    }
3544
3545    /// Internal column used when the aggregation is a grouping set.
3546    ///
3547    /// This column contains a bitmask where each bit represents a grouping
3548    /// expression. The least significant bit corresponds to the rightmost
3549    /// grouping expression. A bit value of 0 indicates that the corresponding
3550    /// column is included in the grouping set, while a value of 1 means it is excluded.
3551    ///
3552    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3553    /// column will have the following values:
3554    ///     0b00: Both `a` and `b` are included
3555    ///     0b01: `b` is excluded
3556    ///     0b10: `a` is excluded
3557    ///     0b11: Both `a` and `b` are excluded
3558    ///
3559    /// This internal column is necessary because excluded columns are replaced
3560    /// with `NULL` values. To handle these cases correctly, we must distinguish
3561    /// between an actual `NULL` value in a column and a column being excluded from the set.
3562    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3563}
3564
3565// Manual implementation needed because of `schema` field. Comparison excludes this field.
3566impl PartialOrd for Aggregate {
3567    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3568        match self.input.partial_cmp(&other.input) {
3569            Some(Ordering::Equal) => {
3570                match self.group_expr.partial_cmp(&other.group_expr) {
3571                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3572                    cmp => cmp,
3573                }
3574            }
3575            cmp => cmp,
3576        }
3577    }
3578}
3579
3580/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3581fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3582    group_expr
3583        .iter()
3584        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3585}
3586
3587/// Calculates functional dependencies for aggregate expressions.
3588fn calc_func_dependencies_for_aggregate(
3589    // Expressions in the GROUP BY clause:
3590    group_expr: &[Expr],
3591    // Input plan of the aggregate:
3592    input: &LogicalPlan,
3593    // Aggregate schema
3594    aggr_schema: &DFSchema,
3595) -> Result<FunctionalDependencies> {
3596    // We can do a case analysis on how to propagate functional dependencies based on
3597    // whether the GROUP BY in question contains a grouping set expression:
3598    // - If so, the functional dependencies will be empty because we cannot guarantee
3599    //   that GROUP BY expression results will be unique.
3600    // - Otherwise, it may be possible to propagate functional dependencies.
3601    if !contains_grouping_set(group_expr) {
3602        let group_by_expr_names = group_expr
3603            .iter()
3604            .map(|item| item.schema_name().to_string())
3605            .collect::<IndexSet<_>>()
3606            .into_iter()
3607            .collect::<Vec<_>>();
3608        let aggregate_func_dependencies = aggregate_functional_dependencies(
3609            input.schema(),
3610            &group_by_expr_names,
3611            aggr_schema,
3612        );
3613        Ok(aggregate_func_dependencies)
3614    } else {
3615        Ok(FunctionalDependencies::empty())
3616    }
3617}
3618
3619/// This function projects functional dependencies of the `input` plan according
3620/// to projection expressions `exprs`.
3621fn calc_func_dependencies_for_project(
3622    exprs: &[Expr],
3623    input: &LogicalPlan,
3624) -> Result<FunctionalDependencies> {
3625    let input_fields = input.schema().field_names();
3626    // Calculate expression indices (if present) in the input schema.
3627    let proj_indices = exprs
3628        .iter()
3629        .map(|expr| match expr {
3630            #[expect(deprecated)]
3631            Expr::Wildcard { qualifier, options } => {
3632                let wildcard_fields = exprlist_to_fields(
3633                    vec![&Expr::Wildcard {
3634                        qualifier: qualifier.clone(),
3635                        options: options.clone(),
3636                    }],
3637                    input,
3638                )?;
3639                Ok::<_, DataFusionError>(
3640                    wildcard_fields
3641                        .into_iter()
3642                        .filter_map(|(qualifier, f)| {
3643                            let flat_name = qualifier
3644                                .map(|t| format!("{}.{}", t, f.name()))
3645                                .unwrap_or_else(|| f.name().clone());
3646                            input_fields.iter().position(|item| *item == flat_name)
3647                        })
3648                        .collect::<Vec<_>>(),
3649                )
3650            }
3651            Expr::Alias(alias) => {
3652                let name = format!("{}", alias.expr);
3653                Ok(input_fields
3654                    .iter()
3655                    .position(|item| *item == name)
3656                    .map(|i| vec![i])
3657                    .unwrap_or(vec![]))
3658            }
3659            _ => {
3660                let name = format!("{expr}");
3661                Ok(input_fields
3662                    .iter()
3663                    .position(|item| *item == name)
3664                    .map(|i| vec![i])
3665                    .unwrap_or(vec![]))
3666            }
3667        })
3668        .collect::<Result<Vec<_>>>()?
3669        .into_iter()
3670        .flatten()
3671        .collect::<Vec<_>>();
3672
3673    Ok(input
3674        .schema()
3675        .functional_dependencies()
3676        .project_functional_dependencies(&proj_indices, exprs.len()))
3677}
3678
3679/// Sorts its input according to a list of sort expressions.
3680#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3681pub struct Sort {
3682    /// The sort expressions
3683    pub expr: Vec<SortExpr>,
3684    /// The incoming logical plan
3685    pub input: Arc<LogicalPlan>,
3686    /// Optional fetch limit
3687    pub fetch: Option<usize>,
3688}
3689
3690/// Join two logical plans on one or more join columns
3691#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3692pub struct Join {
3693    /// Left input
3694    pub left: Arc<LogicalPlan>,
3695    /// Right input
3696    pub right: Arc<LogicalPlan>,
3697    /// Equijoin clause expressed as pairs of (left, right) join expressions
3698    pub on: Vec<(Expr, Expr)>,
3699    /// Filters applied during join (non-equi conditions)
3700    pub filter: Option<Expr>,
3701    /// Join type
3702    pub join_type: JoinType,
3703    /// Join constraint
3704    pub join_constraint: JoinConstraint,
3705    /// The output schema, containing fields from the left and right inputs
3706    pub schema: DFSchemaRef,
3707    /// If null_equals_null is true, null == null else null != null
3708    pub null_equals_null: bool,
3709}
3710
3711impl Join {
3712    /// Creates a new Join operator with automatically computed schema.
3713    ///
3714    /// This constructor computes the schema based on the join type and inputs,
3715    /// removing the need to manually specify the schema or call `recompute_schema`.
3716    ///
3717    /// # Arguments
3718    ///
3719    /// * `left` - Left input plan
3720    /// * `right` - Right input plan
3721    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3722    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3723    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3724    /// * `join_constraint` - Join constraint (On, Using)
3725    /// * `null_equals_null` - Whether NULL = NULL in join comparisons
3726    ///
3727    /// # Returns
3728    ///
3729    /// A new Join operator with the computed schema
3730    pub fn try_new(
3731        left: Arc<LogicalPlan>,
3732        right: Arc<LogicalPlan>,
3733        on: Vec<(Expr, Expr)>,
3734        filter: Option<Expr>,
3735        join_type: JoinType,
3736        join_constraint: JoinConstraint,
3737        null_equals_null: bool,
3738    ) -> Result<Self> {
3739        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3740
3741        Ok(Join {
3742            left,
3743            right,
3744            on,
3745            filter,
3746            join_type,
3747            join_constraint,
3748            schema: Arc::new(join_schema),
3749            null_equals_null,
3750        })
3751    }
3752
3753    /// Create Join with input which wrapped with projection, this method is used to help create physical join.
3754    pub fn try_new_with_project_input(
3755        original: &LogicalPlan,
3756        left: Arc<LogicalPlan>,
3757        right: Arc<LogicalPlan>,
3758        column_on: (Vec<Column>, Vec<Column>),
3759    ) -> Result<Self> {
3760        let original_join = match original {
3761            LogicalPlan::Join(join) => join,
3762            _ => return plan_err!("Could not create join with project input"),
3763        };
3764
3765        let on: Vec<(Expr, Expr)> = column_on
3766            .0
3767            .into_iter()
3768            .zip(column_on.1)
3769            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3770            .collect();
3771        let join_schema =
3772            build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
3773
3774        Ok(Join {
3775            left,
3776            right,
3777            on,
3778            filter: original_join.filter.clone(),
3779            join_type: original_join.join_type,
3780            join_constraint: original_join.join_constraint,
3781            schema: Arc::new(join_schema),
3782            null_equals_null: original_join.null_equals_null,
3783        })
3784    }
3785}
3786
3787// Manual implementation needed because of `schema` field. Comparison excludes this field.
3788impl PartialOrd for Join {
3789    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3790        #[derive(PartialEq, PartialOrd)]
3791        struct ComparableJoin<'a> {
3792            /// Left input
3793            pub left: &'a Arc<LogicalPlan>,
3794            /// Right input
3795            pub right: &'a Arc<LogicalPlan>,
3796            /// Equijoin clause expressed as pairs of (left, right) join expressions
3797            pub on: &'a Vec<(Expr, Expr)>,
3798            /// Filters applied during join (non-equi conditions)
3799            pub filter: &'a Option<Expr>,
3800            /// Join type
3801            pub join_type: &'a JoinType,
3802            /// Join constraint
3803            pub join_constraint: &'a JoinConstraint,
3804            /// If null_equals_null is true, null == null else null != null
3805            pub null_equals_null: &'a bool,
3806        }
3807        let comparable_self = ComparableJoin {
3808            left: &self.left,
3809            right: &self.right,
3810            on: &self.on,
3811            filter: &self.filter,
3812            join_type: &self.join_type,
3813            join_constraint: &self.join_constraint,
3814            null_equals_null: &self.null_equals_null,
3815        };
3816        let comparable_other = ComparableJoin {
3817            left: &other.left,
3818            right: &other.right,
3819            on: &other.on,
3820            filter: &other.filter,
3821            join_type: &other.join_type,
3822            join_constraint: &other.join_constraint,
3823            null_equals_null: &other.null_equals_null,
3824        };
3825        comparable_self.partial_cmp(&comparable_other)
3826    }
3827}
3828
3829/// Subquery
3830#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3831pub struct Subquery {
3832    /// The subquery
3833    pub subquery: Arc<LogicalPlan>,
3834    /// The outer references used in the subquery
3835    pub outer_ref_columns: Vec<Expr>,
3836    /// Span information for subquery projection columns
3837    pub spans: Spans,
3838}
3839
3840impl Normalizeable for Subquery {
3841    fn can_normalize(&self) -> bool {
3842        false
3843    }
3844}
3845
3846impl NormalizeEq for Subquery {
3847    fn normalize_eq(&self, other: &Self) -> bool {
3848        // TODO: may be implement NormalizeEq for LogicalPlan?
3849        *self.subquery == *other.subquery
3850            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3851            && self
3852                .outer_ref_columns
3853                .iter()
3854                .zip(other.outer_ref_columns.iter())
3855                .all(|(a, b)| a.normalize_eq(b))
3856    }
3857}
3858
3859impl Subquery {
3860    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3861        match plan {
3862            Expr::ScalarSubquery(it) => Ok(it),
3863            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3864            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3865        }
3866    }
3867
3868    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3869        Subquery {
3870            subquery: plan,
3871            outer_ref_columns: self.outer_ref_columns.clone(),
3872            spans: Spans::new(),
3873        }
3874    }
3875}
3876
3877impl Debug for Subquery {
3878    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3879        write!(f, "<subquery>")
3880    }
3881}
3882
3883/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
3884///
3885/// See [`Partitioning`] for more details on partitioning
3886///
3887/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
3888#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3889pub enum Partitioning {
3890    /// Allocate batches using a round-robin algorithm and the specified number of partitions
3891    RoundRobinBatch(usize),
3892    /// Allocate rows based on a hash of one of more expressions and the specified number
3893    /// of partitions.
3894    Hash(Vec<Expr>, usize),
3895    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
3896    DistributeBy(Vec<Expr>),
3897}
3898
3899/// Represent the unnesting operation on a list column, such as the recursion depth and
3900/// the output column name after unnesting
3901///
3902/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
3903///
3904/// ```text
3905///   input             output_name
3906///  ┌─────────┐      ┌─────────┐
3907///  │{{1,2}}  │      │ 1       │
3908///  ├─────────┼─────►├─────────┤
3909///  │{{3}}    │      │ 2       │
3910///  ├─────────┤      ├─────────┤
3911///  │{{4},{5}}│      │ 3       │
3912///  └─────────┘      ├─────────┤
3913///                   │ 4       │
3914///                   ├─────────┤
3915///                   │ 5       │
3916///                   └─────────┘
3917/// ```
3918#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3919pub struct ColumnUnnestList {
3920    pub output_column: Column,
3921    pub depth: usize,
3922}
3923
3924impl Display for ColumnUnnestList {
3925    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3926        write!(f, "{}|depth={}", self.output_column, self.depth)
3927    }
3928}
3929
3930/// Unnest a column that contains a nested list type. See
3931/// [`UnnestOptions`] for more details.
3932#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3933pub struct Unnest {
3934    /// The incoming logical plan
3935    pub input: Arc<LogicalPlan>,
3936    /// Columns to run unnest on, can be a list of (List/Struct) columns
3937    pub exec_columns: Vec<Column>,
3938    /// refer to the indices(in the input schema) of columns
3939    /// that have type list to run unnest on
3940    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3941    /// refer to the indices (in the input schema) of columns
3942    /// that have type struct to run unnest on
3943    pub struct_type_columns: Vec<usize>,
3944    /// Having items aligned with the output columns
3945    /// representing which column in the input schema each output column depends on
3946    pub dependency_indices: Vec<usize>,
3947    /// The output schema, containing the unnested field column.
3948    pub schema: DFSchemaRef,
3949    /// Options
3950    pub options: UnnestOptions,
3951}
3952
3953// Manual implementation needed because of `schema` field. Comparison excludes this field.
3954impl PartialOrd for Unnest {
3955    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3956        #[derive(PartialEq, PartialOrd)]
3957        struct ComparableUnnest<'a> {
3958            /// The incoming logical plan
3959            pub input: &'a Arc<LogicalPlan>,
3960            /// Columns to run unnest on, can be a list of (List/Struct) columns
3961            pub exec_columns: &'a Vec<Column>,
3962            /// refer to the indices(in the input schema) of columns
3963            /// that have type list to run unnest on
3964            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
3965            /// refer to the indices (in the input schema) of columns
3966            /// that have type struct to run unnest on
3967            pub struct_type_columns: &'a Vec<usize>,
3968            /// Having items aligned with the output columns
3969            /// representing which column in the input schema each output column depends on
3970            pub dependency_indices: &'a Vec<usize>,
3971            /// Options
3972            pub options: &'a UnnestOptions,
3973        }
3974        let comparable_self = ComparableUnnest {
3975            input: &self.input,
3976            exec_columns: &self.exec_columns,
3977            list_type_columns: &self.list_type_columns,
3978            struct_type_columns: &self.struct_type_columns,
3979            dependency_indices: &self.dependency_indices,
3980            options: &self.options,
3981        };
3982        let comparable_other = ComparableUnnest {
3983            input: &other.input,
3984            exec_columns: &other.exec_columns,
3985            list_type_columns: &other.list_type_columns,
3986            struct_type_columns: &other.struct_type_columns,
3987            dependency_indices: &other.dependency_indices,
3988            options: &other.options,
3989        };
3990        comparable_self.partial_cmp(&comparable_other)
3991    }
3992}
3993
3994#[cfg(test)]
3995mod tests {
3996
3997    use super::*;
3998    use crate::builder::LogicalTableSource;
3999    use crate::logical_plan::table_scan;
4000    use crate::{
4001        binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4002        GroupingSet,
4003    };
4004
4005    use datafusion_common::tree_node::{
4006        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4007    };
4008    use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4009    use insta::{assert_debug_snapshot, assert_snapshot};
4010
4011    use crate::test::function_stub::count;
4012
4013    fn employee_schema() -> Schema {
4014        Schema::new(vec![
4015            Field::new("id", DataType::Int32, false),
4016            Field::new("first_name", DataType::Utf8, false),
4017            Field::new("last_name", DataType::Utf8, false),
4018            Field::new("state", DataType::Utf8, false),
4019            Field::new("salary", DataType::Int32, false),
4020        ])
4021    }
4022
4023    fn display_plan() -> Result<LogicalPlan> {
4024        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4025            .build()?;
4026
4027        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4028            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4029            .project(vec![col("id")])?
4030            .build()
4031    }
4032
4033    #[test]
4034    fn test_display_indent() -> Result<()> {
4035        let plan = display_plan()?;
4036
4037        assert_snapshot!(plan.display_indent(), @r"
4038        Projection: employee_csv.id
4039          Filter: employee_csv.state IN (<subquery>)
4040            Subquery:
4041              TableScan: employee_csv projection=[state]
4042            TableScan: employee_csv projection=[id, state]
4043        ");
4044        Ok(())
4045    }
4046
4047    #[test]
4048    fn test_display_indent_schema() -> Result<()> {
4049        let plan = display_plan()?;
4050
4051        assert_snapshot!(plan.display_indent_schema(), @r"
4052        Projection: employee_csv.id [id:Int32]
4053          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4054            Subquery: [state:Utf8]
4055              TableScan: employee_csv projection=[state] [state:Utf8]
4056            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4057        ");
4058        Ok(())
4059    }
4060
4061    #[test]
4062    fn test_display_subquery_alias() -> Result<()> {
4063        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4064            .build()?;
4065        let plan1 = Arc::new(plan1);
4066
4067        let plan =
4068            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4069                .project(vec![col("id"), exists(plan1).alias("exists")])?
4070                .build();
4071
4072        assert_snapshot!(plan?.display_indent(), @r"
4073        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4074          Subquery:
4075            TableScan: employee_csv projection=[state]
4076          TableScan: employee_csv projection=[id, state]
4077        ");
4078        Ok(())
4079    }
4080
4081    #[test]
4082    fn test_display_graphviz() -> Result<()> {
4083        let plan = display_plan()?;
4084
4085        // just test for a few key lines in the output rather than the
4086        // whole thing to make test maintenance easier.
4087        assert_snapshot!(plan.display_graphviz(), @r#"
4088        // Begin DataFusion GraphViz Plan,
4089        // display it online here: https://dreampuf.github.io/GraphvizOnline
4090
4091        digraph {
4092          subgraph cluster_1
4093          {
4094            graph[label="LogicalPlan"]
4095            2[shape=box label="Projection: employee_csv.id"]
4096            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4097            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4098            4[shape=box label="Subquery:"]
4099            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4100            5[shape=box label="TableScan: employee_csv projection=[state]"]
4101            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4102            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4103            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4104          }
4105          subgraph cluster_7
4106          {
4107            graph[label="Detailed LogicalPlan"]
4108            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4109            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4110            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4111            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4112            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4113            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4114            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4115            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4116            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4117          }
4118        }
4119        // End DataFusion GraphViz Plan
4120        "#);
4121        Ok(())
4122    }
4123
4124    #[test]
4125    fn test_display_pg_json() -> Result<()> {
4126        let plan = display_plan()?;
4127
4128        assert_snapshot!(plan.display_pg_json(), @r#"
4129        [
4130          {
4131            "Plan": {
4132              "Expressions": [
4133                "employee_csv.id"
4134              ],
4135              "Node Type": "Projection",
4136              "Output": [
4137                "id"
4138              ],
4139              "Plans": [
4140                {
4141                  "Condition": "employee_csv.state IN (<subquery>)",
4142                  "Node Type": "Filter",
4143                  "Output": [
4144                    "id",
4145                    "state"
4146                  ],
4147                  "Plans": [
4148                    {
4149                      "Node Type": "Subquery",
4150                      "Output": [
4151                        "state"
4152                      ],
4153                      "Plans": [
4154                        {
4155                          "Node Type": "TableScan",
4156                          "Output": [
4157                            "state"
4158                          ],
4159                          "Plans": [],
4160                          "Relation Name": "employee_csv"
4161                        }
4162                      ]
4163                    },
4164                    {
4165                      "Node Type": "TableScan",
4166                      "Output": [
4167                        "id",
4168                        "state"
4169                      ],
4170                      "Plans": [],
4171                      "Relation Name": "employee_csv"
4172                    }
4173                  ]
4174                }
4175              ]
4176            }
4177          }
4178        ]
4179        "#);
4180        Ok(())
4181    }
4182
4183    /// Tests for the Visitor trait and walking logical plan nodes
4184    #[derive(Debug, Default)]
4185    struct OkVisitor {
4186        strings: Vec<String>,
4187    }
4188
4189    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4190        type Node = LogicalPlan;
4191
4192        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4193            let s = match plan {
4194                LogicalPlan::Projection { .. } => "pre_visit Projection",
4195                LogicalPlan::Filter { .. } => "pre_visit Filter",
4196                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4197                _ => {
4198                    return not_impl_err!("unknown plan type");
4199                }
4200            };
4201
4202            self.strings.push(s.into());
4203            Ok(TreeNodeRecursion::Continue)
4204        }
4205
4206        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4207            let s = match plan {
4208                LogicalPlan::Projection { .. } => "post_visit Projection",
4209                LogicalPlan::Filter { .. } => "post_visit Filter",
4210                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4211                _ => {
4212                    return not_impl_err!("unknown plan type");
4213                }
4214            };
4215
4216            self.strings.push(s.into());
4217            Ok(TreeNodeRecursion::Continue)
4218        }
4219    }
4220
4221    #[test]
4222    fn visit_order() {
4223        let mut visitor = OkVisitor::default();
4224        let plan = test_plan();
4225        let res = plan.visit_with_subqueries(&mut visitor);
4226        assert!(res.is_ok());
4227
4228        assert_debug_snapshot!(visitor.strings, @r#"
4229        [
4230            "pre_visit Projection",
4231            "pre_visit Filter",
4232            "pre_visit TableScan",
4233            "post_visit TableScan",
4234            "post_visit Filter",
4235            "post_visit Projection",
4236        ]
4237        "#);
4238    }
4239
4240    #[derive(Debug, Default)]
4241    /// Counter than counts to zero and returns true when it gets there
4242    struct OptionalCounter {
4243        val: Option<usize>,
4244    }
4245
4246    impl OptionalCounter {
4247        fn new(val: usize) -> Self {
4248            Self { val: Some(val) }
4249        }
4250        // Decrements the counter by 1, if any, returning true if it hits zero
4251        fn dec(&mut self) -> bool {
4252            if Some(0) == self.val {
4253                true
4254            } else {
4255                self.val = self.val.take().map(|i| i - 1);
4256                false
4257            }
4258        }
4259    }
4260
4261    #[derive(Debug, Default)]
4262    /// Visitor that returns false after some number of visits
4263    struct StoppingVisitor {
4264        inner: OkVisitor,
4265        /// When Some(0) returns false from pre_visit
4266        return_false_from_pre_in: OptionalCounter,
4267        /// When Some(0) returns false from post_visit
4268        return_false_from_post_in: OptionalCounter,
4269    }
4270
4271    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4272        type Node = LogicalPlan;
4273
4274        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4275            if self.return_false_from_pre_in.dec() {
4276                return Ok(TreeNodeRecursion::Stop);
4277            }
4278            self.inner.f_down(plan)?;
4279
4280            Ok(TreeNodeRecursion::Continue)
4281        }
4282
4283        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4284            if self.return_false_from_post_in.dec() {
4285                return Ok(TreeNodeRecursion::Stop);
4286            }
4287
4288            self.inner.f_up(plan)
4289        }
4290    }
4291
4292    /// test early stopping in pre-visit
4293    #[test]
4294    fn early_stopping_pre_visit() {
4295        let mut visitor = StoppingVisitor {
4296            return_false_from_pre_in: OptionalCounter::new(2),
4297            ..Default::default()
4298        };
4299        let plan = test_plan();
4300        let res = plan.visit_with_subqueries(&mut visitor);
4301        assert!(res.is_ok());
4302
4303        assert_debug_snapshot!(
4304            visitor.inner.strings,
4305            @r#"
4306        [
4307            "pre_visit Projection",
4308            "pre_visit Filter",
4309        ]
4310        "#
4311        );
4312    }
4313
4314    #[test]
4315    fn early_stopping_post_visit() {
4316        let mut visitor = StoppingVisitor {
4317            return_false_from_post_in: OptionalCounter::new(1),
4318            ..Default::default()
4319        };
4320        let plan = test_plan();
4321        let res = plan.visit_with_subqueries(&mut visitor);
4322        assert!(res.is_ok());
4323
4324        assert_debug_snapshot!(
4325            visitor.inner.strings,
4326            @r#"
4327        [
4328            "pre_visit Projection",
4329            "pre_visit Filter",
4330            "pre_visit TableScan",
4331            "post_visit TableScan",
4332        ]
4333        "#
4334        );
4335    }
4336
4337    #[derive(Debug, Default)]
4338    /// Visitor that returns an error after some number of visits
4339    struct ErrorVisitor {
4340        inner: OkVisitor,
4341        /// When Some(0) returns false from pre_visit
4342        return_error_from_pre_in: OptionalCounter,
4343        /// When Some(0) returns false from post_visit
4344        return_error_from_post_in: OptionalCounter,
4345    }
4346
4347    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4348        type Node = LogicalPlan;
4349
4350        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4351            if self.return_error_from_pre_in.dec() {
4352                return not_impl_err!("Error in pre_visit");
4353            }
4354
4355            self.inner.f_down(plan)
4356        }
4357
4358        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4359            if self.return_error_from_post_in.dec() {
4360                return not_impl_err!("Error in post_visit");
4361            }
4362
4363            self.inner.f_up(plan)
4364        }
4365    }
4366
4367    #[test]
4368    fn error_pre_visit() {
4369        let mut visitor = ErrorVisitor {
4370            return_error_from_pre_in: OptionalCounter::new(2),
4371            ..Default::default()
4372        };
4373        let plan = test_plan();
4374        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4375        assert_snapshot!(
4376            res.strip_backtrace(),
4377            @"This feature is not implemented: Error in pre_visit"
4378        );
4379        assert_debug_snapshot!(
4380            visitor.inner.strings,
4381            @r#"
4382        [
4383            "pre_visit Projection",
4384            "pre_visit Filter",
4385        ]
4386        "#
4387        );
4388    }
4389
4390    #[test]
4391    fn error_post_visit() {
4392        let mut visitor = ErrorVisitor {
4393            return_error_from_post_in: OptionalCounter::new(1),
4394            ..Default::default()
4395        };
4396        let plan = test_plan();
4397        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4398        assert_snapshot!(
4399            res.strip_backtrace(),
4400            @"This feature is not implemented: Error in post_visit"
4401        );
4402        assert_debug_snapshot!(
4403            visitor.inner.strings,
4404            @r#"
4405        [
4406            "pre_visit Projection",
4407            "pre_visit Filter",
4408            "pre_visit TableScan",
4409            "post_visit TableScan",
4410        ]
4411        "#
4412        );
4413    }
4414
4415    #[test]
4416    fn projection_expr_schema_mismatch() -> Result<()> {
4417        let empty_schema = Arc::new(DFSchema::empty());
4418        let p = Projection::try_new_with_schema(
4419            vec![col("a")],
4420            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4421                produce_one_row: false,
4422                schema: Arc::clone(&empty_schema),
4423            })),
4424            empty_schema,
4425        );
4426        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4427        Ok(())
4428    }
4429
4430    fn test_plan() -> LogicalPlan {
4431        let schema = Schema::new(vec![
4432            Field::new("id", DataType::Int32, false),
4433            Field::new("state", DataType::Utf8, false),
4434        ]);
4435
4436        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4437            .unwrap()
4438            .filter(col("state").eq(lit("CO")))
4439            .unwrap()
4440            .project(vec![col("id")])
4441            .unwrap()
4442            .build()
4443            .unwrap()
4444    }
4445
4446    #[test]
4447    fn test_replace_invalid_placeholder() {
4448        // test empty placeholder
4449        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4450
4451        let plan = table_scan(TableReference::none(), &schema, None)
4452            .unwrap()
4453            .filter(col("id").eq(placeholder("")))
4454            .unwrap()
4455            .build()
4456            .unwrap();
4457
4458        let param_values = vec![ScalarValue::Int32(Some(42))];
4459        plan.replace_params_with_values(&param_values.clone().into())
4460            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4461
4462        // test $0 placeholder
4463        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4464
4465        let plan = table_scan(TableReference::none(), &schema, None)
4466            .unwrap()
4467            .filter(col("id").eq(placeholder("$0")))
4468            .unwrap()
4469            .build()
4470            .unwrap();
4471
4472        plan.replace_params_with_values(&param_values.clone().into())
4473            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4474
4475        // test $00 placeholder
4476        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4477
4478        let plan = table_scan(TableReference::none(), &schema, None)
4479            .unwrap()
4480            .filter(col("id").eq(placeholder("$00")))
4481            .unwrap()
4482            .build()
4483            .unwrap();
4484
4485        plan.replace_params_with_values(&param_values.into())
4486            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4487    }
4488
4489    #[test]
4490    fn test_nullable_schema_after_grouping_set() {
4491        let schema = Schema::new(vec![
4492            Field::new("foo", DataType::Int32, false),
4493            Field::new("bar", DataType::Int32, false),
4494        ]);
4495
4496        let plan = table_scan(TableReference::none(), &schema, None)
4497            .unwrap()
4498            .aggregate(
4499                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4500                    vec![col("foo")],
4501                    vec![col("bar")],
4502                ]))],
4503                vec![count(lit(true))],
4504            )
4505            .unwrap()
4506            .build()
4507            .unwrap();
4508
4509        let output_schema = plan.schema();
4510
4511        assert!(output_schema
4512            .field_with_name(None, "foo")
4513            .unwrap()
4514            .is_nullable(),);
4515        assert!(output_schema
4516            .field_with_name(None, "bar")
4517            .unwrap()
4518            .is_nullable());
4519    }
4520
4521    #[test]
4522    fn test_filter_is_scalar() {
4523        // test empty placeholder
4524        let schema =
4525            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4526
4527        let source = Arc::new(LogicalTableSource::new(schema));
4528        let schema = Arc::new(
4529            DFSchema::try_from_qualified_schema(
4530                TableReference::bare("tab"),
4531                &source.schema(),
4532            )
4533            .unwrap(),
4534        );
4535        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4536            table_name: TableReference::bare("tab"),
4537            source: Arc::clone(&source) as Arc<dyn TableSource>,
4538            projection: None,
4539            projected_schema: Arc::clone(&schema),
4540            filters: vec![],
4541            fetch: None,
4542        }));
4543        let col = schema.field_names()[0].clone();
4544
4545        let filter = Filter::try_new(
4546            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4547            scan,
4548        )
4549        .unwrap();
4550        assert!(!filter.is_scalar());
4551        let unique_schema = Arc::new(
4552            schema
4553                .as_ref()
4554                .clone()
4555                .with_functional_dependencies(
4556                    FunctionalDependencies::new_from_constraints(
4557                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4558                            vec![0],
4559                        )])),
4560                        1,
4561                    ),
4562                )
4563                .unwrap(),
4564        );
4565        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4566            table_name: TableReference::bare("tab"),
4567            source,
4568            projection: None,
4569            projected_schema: Arc::clone(&unique_schema),
4570            filters: vec![],
4571            fetch: None,
4572        }));
4573        let col = schema.field_names()[0].clone();
4574
4575        let filter =
4576            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4577        assert!(filter.is_scalar());
4578    }
4579
4580    #[test]
4581    fn test_transform_explain() {
4582        let schema = Schema::new(vec![
4583            Field::new("foo", DataType::Int32, false),
4584            Field::new("bar", DataType::Int32, false),
4585        ]);
4586
4587        let plan = table_scan(TableReference::none(), &schema, None)
4588            .unwrap()
4589            .explain(false, false)
4590            .unwrap()
4591            .build()
4592            .unwrap();
4593
4594        let external_filter = col("foo").eq(lit(true));
4595
4596        // after transformation, because plan is not the same anymore,
4597        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
4598        let plan = plan
4599            .transform(|plan| match plan {
4600                LogicalPlan::TableScan(table) => {
4601                    let filter = Filter::try_new(
4602                        external_filter.clone(),
4603                        Arc::new(LogicalPlan::TableScan(table)),
4604                    )
4605                    .unwrap();
4606                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4607                }
4608                x => Ok(Transformed::no(x)),
4609            })
4610            .data()
4611            .unwrap();
4612
4613        let actual = format!("{}", plan.display_indent());
4614        assert_snapshot!(actual, @r"
4615        Explain
4616          Filter: foo = Boolean(true)
4617            TableScan: ?table?
4618        ")
4619    }
4620
4621    #[test]
4622    fn test_plan_partial_ord() {
4623        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4624            produce_one_row: false,
4625            schema: Arc::new(DFSchema::empty()),
4626        });
4627
4628        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4629            schema: Arc::new(Schema::new(vec![Field::new(
4630                "foo",
4631                DataType::Int32,
4632                false,
4633            )])),
4634            output_schema: DFSchemaRef::new(DFSchema::empty()),
4635        });
4636
4637        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4638            schema: Arc::new(Schema::new(vec![Field::new(
4639                "foo",
4640                DataType::Int32,
4641                false,
4642            )])),
4643            output_schema: DFSchemaRef::new(DFSchema::empty()),
4644        });
4645
4646        assert_eq!(
4647            empty_relation.partial_cmp(&describe_table),
4648            Some(Ordering::Less)
4649        );
4650        assert_eq!(
4651            describe_table.partial_cmp(&empty_relation),
4652            Some(Ordering::Greater)
4653        );
4654        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4655    }
4656
4657    #[test]
4658    fn test_limit_with_new_children() {
4659        let input = Arc::new(LogicalPlan::Values(Values {
4660            schema: Arc::new(DFSchema::empty()),
4661            values: vec![vec![]],
4662        }));
4663        let cases = [
4664            LogicalPlan::Limit(Limit {
4665                skip: None,
4666                fetch: None,
4667                input: Arc::clone(&input),
4668            }),
4669            LogicalPlan::Limit(Limit {
4670                skip: None,
4671                fetch: Some(Box::new(Expr::Literal(
4672                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4673                    None,
4674                ))),
4675                input: Arc::clone(&input),
4676            }),
4677            LogicalPlan::Limit(Limit {
4678                skip: Some(Box::new(Expr::Literal(
4679                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4680                    None,
4681                ))),
4682                fetch: None,
4683                input: Arc::clone(&input),
4684            }),
4685            LogicalPlan::Limit(Limit {
4686                skip: Some(Box::new(Expr::Literal(
4687                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
4688                    None,
4689                ))),
4690                fetch: Some(Box::new(Expr::Literal(
4691                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4692                    None,
4693                ))),
4694                input,
4695            }),
4696        ];
4697
4698        for limit in cases {
4699            let new_limit = limit
4700                .with_new_exprs(
4701                    limit.expressions(),
4702                    limit.inputs().into_iter().cloned().collect(),
4703                )
4704                .unwrap();
4705            assert_eq!(limit, new_limit);
4706        }
4707    }
4708
4709    #[test]
4710    fn test_with_subqueries_jump() {
4711        // The test plan contains a `Project` node above a `Filter` node, and the
4712        // `Project` node contains a subquery plan with a `Filter` root node, so returning
4713        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
4714        // `Filter`s.
4715        let subquery_schema =
4716            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4717
4718        let subquery_plan =
4719            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4720                .unwrap()
4721                .filter(col("sub_id").eq(lit(0)))
4722                .unwrap()
4723                .build()
4724                .unwrap();
4725
4726        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4727
4728        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
4729            .unwrap()
4730            .filter(col("id").eq(lit(0)))
4731            .unwrap()
4732            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
4733            .unwrap()
4734            .build()
4735            .unwrap();
4736
4737        let mut filter_found = false;
4738        plan.apply_with_subqueries(|plan| {
4739            match plan {
4740                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4741                LogicalPlan::Filter(..) => filter_found = true,
4742                _ => {}
4743            }
4744            Ok(TreeNodeRecursion::Continue)
4745        })
4746        .unwrap();
4747        assert!(!filter_found);
4748
4749        struct ProjectJumpVisitor {
4750            filter_found: bool,
4751        }
4752
4753        impl ProjectJumpVisitor {
4754            fn new() -> Self {
4755                Self {
4756                    filter_found: false,
4757                }
4758            }
4759        }
4760
4761        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
4762            type Node = LogicalPlan;
4763
4764            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
4765                match node {
4766                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4767                    LogicalPlan::Filter(..) => self.filter_found = true,
4768                    _ => {}
4769                }
4770                Ok(TreeNodeRecursion::Continue)
4771            }
4772        }
4773
4774        let mut visitor = ProjectJumpVisitor::new();
4775        plan.visit_with_subqueries(&mut visitor).unwrap();
4776        assert!(!visitor.filter_found);
4777
4778        let mut filter_found = false;
4779        plan.clone()
4780            .transform_down_with_subqueries(|plan| {
4781                match plan {
4782                    LogicalPlan::Projection(..) => {
4783                        return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
4784                    }
4785                    LogicalPlan::Filter(..) => filter_found = true,
4786                    _ => {}
4787                }
4788                Ok(Transformed::no(plan))
4789            })
4790            .unwrap();
4791        assert!(!filter_found);
4792
4793        let mut filter_found = false;
4794        plan.clone()
4795            .transform_down_up_with_subqueries(
4796                |plan| {
4797                    match plan {
4798                        LogicalPlan::Projection(..) => {
4799                            return Ok(Transformed::new(
4800                                plan,
4801                                false,
4802                                TreeNodeRecursion::Jump,
4803                            ))
4804                        }
4805                        LogicalPlan::Filter(..) => filter_found = true,
4806                        _ => {}
4807                    }
4808                    Ok(Transformed::no(plan))
4809                },
4810                |plan| Ok(Transformed::no(plan)),
4811            )
4812            .unwrap();
4813        assert!(!filter_found);
4814
4815        struct ProjectJumpRewriter {
4816            filter_found: bool,
4817        }
4818
4819        impl ProjectJumpRewriter {
4820            fn new() -> Self {
4821                Self {
4822                    filter_found: false,
4823                }
4824            }
4825        }
4826
4827        impl TreeNodeRewriter for ProjectJumpRewriter {
4828            type Node = LogicalPlan;
4829
4830            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
4831                match node {
4832                    LogicalPlan::Projection(..) => {
4833                        return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
4834                    }
4835                    LogicalPlan::Filter(..) => self.filter_found = true,
4836                    _ => {}
4837                }
4838                Ok(Transformed::no(node))
4839            }
4840        }
4841
4842        let mut rewriter = ProjectJumpRewriter::new();
4843        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
4844        assert!(!rewriter.filter_found);
4845    }
4846
4847    #[test]
4848    fn test_with_unresolved_placeholders() {
4849        let field_name = "id";
4850        let placeholder_value = "$1";
4851        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
4852
4853        let plan = table_scan(TableReference::none(), &schema, None)
4854            .unwrap()
4855            .filter(col(field_name).eq(placeholder(placeholder_value)))
4856            .unwrap()
4857            .build()
4858            .unwrap();
4859
4860        // Check that the placeholder parameters have not received a DataType.
4861        let params = plan.get_parameter_types().unwrap();
4862        assert_eq!(params.len(), 1);
4863
4864        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
4865        assert_eq!(parameter_type, None);
4866    }
4867
4868    #[test]
4869    fn test_join_with_new_exprs() -> Result<()> {
4870        fn create_test_join(
4871            on: Vec<(Expr, Expr)>,
4872            filter: Option<Expr>,
4873        ) -> Result<LogicalPlan> {
4874            let schema = Schema::new(vec![
4875                Field::new("a", DataType::Int32, false),
4876                Field::new("b", DataType::Int32, false),
4877            ]);
4878
4879            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
4880            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
4881
4882            Ok(LogicalPlan::Join(Join {
4883                left: Arc::new(
4884                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
4885                ),
4886                right: Arc::new(
4887                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
4888                ),
4889                on,
4890                filter,
4891                join_type: JoinType::Inner,
4892                join_constraint: JoinConstraint::On,
4893                schema: Arc::new(left_schema.join(&right_schema)?),
4894                null_equals_null: false,
4895            }))
4896        }
4897
4898        {
4899            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
4900            let LogicalPlan::Join(join) = join.with_new_exprs(
4901                join.expressions(),
4902                join.inputs().into_iter().cloned().collect(),
4903            )?
4904            else {
4905                unreachable!()
4906            };
4907            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4908            assert_eq!(join.filter, None);
4909        }
4910
4911        {
4912            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
4913            let LogicalPlan::Join(join) = join.with_new_exprs(
4914                join.expressions(),
4915                join.inputs().into_iter().cloned().collect(),
4916            )?
4917            else {
4918                unreachable!()
4919            };
4920            assert_eq!(join.on, vec![]);
4921            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
4922        }
4923
4924        {
4925            let join = create_test_join(
4926                vec![(col("t1.a"), (col("t2.a")))],
4927                Some(col("t1.b").gt(col("t2.b"))),
4928            )?;
4929            let LogicalPlan::Join(join) = join.with_new_exprs(
4930                join.expressions(),
4931                join.inputs().into_iter().cloned().collect(),
4932            )?
4933            else {
4934                unreachable!()
4935            };
4936            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4937            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
4938        }
4939
4940        {
4941            let join = create_test_join(
4942                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
4943                None,
4944            )?;
4945            let LogicalPlan::Join(join) = join.with_new_exprs(
4946                vec![
4947                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4948                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
4949                    col("t1.b"),
4950                    col("t2.b"),
4951                    lit(true),
4952                ],
4953                join.inputs().into_iter().cloned().collect(),
4954            )?
4955            else {
4956                unreachable!()
4957            };
4958            assert_eq!(
4959                join.on,
4960                vec![
4961                    (
4962                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4963                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
4964                    ),
4965                    (col("t1.b"), (col("t2.b")))
4966                ]
4967            );
4968            assert_eq!(join.filter, Some(lit(true)));
4969        }
4970
4971        Ok(())
4972    }
4973
4974    #[test]
4975    fn test_join_try_new() -> Result<()> {
4976        let schema = Schema::new(vec![
4977            Field::new("a", DataType::Int32, false),
4978            Field::new("b", DataType::Int32, false),
4979        ]);
4980
4981        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
4982
4983        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
4984
4985        let join_types = vec![
4986            JoinType::Inner,
4987            JoinType::Left,
4988            JoinType::Right,
4989            JoinType::Full,
4990            JoinType::LeftSemi,
4991            JoinType::LeftAnti,
4992            JoinType::RightSemi,
4993            JoinType::RightAnti,
4994            JoinType::LeftMark,
4995        ];
4996
4997        for join_type in join_types {
4998            let join = Join::try_new(
4999                Arc::new(left_scan.clone()),
5000                Arc::new(right_scan.clone()),
5001                vec![(col("t1.a"), col("t2.a"))],
5002                Some(col("t1.b").gt(col("t2.b"))),
5003                join_type,
5004                JoinConstraint::On,
5005                false,
5006            )?;
5007
5008            match join_type {
5009                JoinType::LeftSemi | JoinType::LeftAnti => {
5010                    assert_eq!(join.schema.fields().len(), 2);
5011
5012                    let fields = join.schema.fields();
5013                    assert_eq!(
5014                        fields[0].name(),
5015                        "a",
5016                        "First field should be 'a' from left table"
5017                    );
5018                    assert_eq!(
5019                        fields[1].name(),
5020                        "b",
5021                        "Second field should be 'b' from left table"
5022                    );
5023                }
5024                JoinType::RightSemi | JoinType::RightAnti => {
5025                    assert_eq!(join.schema.fields().len(), 2);
5026
5027                    let fields = join.schema.fields();
5028                    assert_eq!(
5029                        fields[0].name(),
5030                        "a",
5031                        "First field should be 'a' from right table"
5032                    );
5033                    assert_eq!(
5034                        fields[1].name(),
5035                        "b",
5036                        "Second field should be 'b' from right table"
5037                    );
5038                }
5039                JoinType::LeftMark => {
5040                    assert_eq!(join.schema.fields().len(), 3);
5041
5042                    let fields = join.schema.fields();
5043                    assert_eq!(
5044                        fields[0].name(),
5045                        "a",
5046                        "First field should be 'a' from left table"
5047                    );
5048                    assert_eq!(
5049                        fields[1].name(),
5050                        "b",
5051                        "Second field should be 'b' from left table"
5052                    );
5053                    assert_eq!(
5054                        fields[2].name(),
5055                        "mark",
5056                        "Third field should be the mark column"
5057                    );
5058
5059                    assert!(!fields[0].is_nullable());
5060                    assert!(!fields[1].is_nullable());
5061                    assert!(!fields[2].is_nullable());
5062                }
5063                _ => {
5064                    assert_eq!(join.schema.fields().len(), 4);
5065
5066                    let fields = join.schema.fields();
5067                    assert_eq!(
5068                        fields[0].name(),
5069                        "a",
5070                        "First field should be 'a' from left table"
5071                    );
5072                    assert_eq!(
5073                        fields[1].name(),
5074                        "b",
5075                        "Second field should be 'b' from left table"
5076                    );
5077                    assert_eq!(
5078                        fields[2].name(),
5079                        "a",
5080                        "Third field should be 'a' from right table"
5081                    );
5082                    assert_eq!(
5083                        fields[3].name(),
5084                        "b",
5085                        "Fourth field should be 'b' from right table"
5086                    );
5087
5088                    if join_type == JoinType::Left {
5089                        // Left side fields (first two) shouldn't be nullable
5090                        assert!(!fields[0].is_nullable());
5091                        assert!(!fields[1].is_nullable());
5092                        // Right side fields (third and fourth) should be nullable
5093                        assert!(fields[2].is_nullable());
5094                        assert!(fields[3].is_nullable());
5095                    } else if join_type == JoinType::Right {
5096                        // Left side fields (first two) should be nullable
5097                        assert!(fields[0].is_nullable());
5098                        assert!(fields[1].is_nullable());
5099                        // Right side fields (third and fourth) shouldn't be nullable
5100                        assert!(!fields[2].is_nullable());
5101                        assert!(!fields[3].is_nullable());
5102                    } else if join_type == JoinType::Full {
5103                        assert!(fields[0].is_nullable());
5104                        assert!(fields[1].is_nullable());
5105                        assert!(fields[2].is_nullable());
5106                        assert!(fields[3].is_nullable());
5107                    }
5108                }
5109            }
5110
5111            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5112            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5113            assert_eq!(join.join_type, join_type);
5114            assert_eq!(join.join_constraint, JoinConstraint::On);
5115            assert!(!join.null_equals_null);
5116        }
5117
5118        Ok(())
5119    }
5120
5121    #[test]
5122    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5123        let left_schema = Schema::new(vec![
5124            Field::new("id", DataType::Int32, false), // Common column in both tables
5125            Field::new("name", DataType::Utf8, false), // Unique to left
5126            Field::new("value", DataType::Int32, false), // Common column, different meaning
5127        ]);
5128
5129        let right_schema = Schema::new(vec![
5130            Field::new("id", DataType::Int32, false), // Common column in both tables
5131            Field::new("category", DataType::Utf8, false), // Unique to right
5132            Field::new("value", DataType::Float64, true), // Common column, different meaning
5133        ]);
5134
5135        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5136
5137        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5138
5139        // Test 1: USING constraint with a common column
5140        {
5141            // In the logical plan, both copies of the `id` column are preserved
5142            // The USING constraint is handled later during physical execution, where the common column appears once
5143            let join = Join::try_new(
5144                Arc::new(left_plan.clone()),
5145                Arc::new(right_plan.clone()),
5146                vec![(col("t1.id"), col("t2.id"))],
5147                None,
5148                JoinType::Inner,
5149                JoinConstraint::Using,
5150                false,
5151            )?;
5152
5153            let fields = join.schema.fields();
5154
5155            assert_eq!(fields.len(), 6);
5156
5157            assert_eq!(
5158                fields[0].name(),
5159                "id",
5160                "First field should be 'id' from left table"
5161            );
5162            assert_eq!(
5163                fields[1].name(),
5164                "name",
5165                "Second field should be 'name' from left table"
5166            );
5167            assert_eq!(
5168                fields[2].name(),
5169                "value",
5170                "Third field should be 'value' from left table"
5171            );
5172            assert_eq!(
5173                fields[3].name(),
5174                "id",
5175                "Fourth field should be 'id' from right table"
5176            );
5177            assert_eq!(
5178                fields[4].name(),
5179                "category",
5180                "Fifth field should be 'category' from right table"
5181            );
5182            assert_eq!(
5183                fields[5].name(),
5184                "value",
5185                "Sixth field should be 'value' from right table"
5186            );
5187
5188            assert_eq!(join.join_constraint, JoinConstraint::Using);
5189        }
5190
5191        // Test 2: Complex join condition with expressions
5192        {
5193            // Complex condition: join on id equality AND where left.value < right.value
5194            let join = Join::try_new(
5195                Arc::new(left_plan.clone()),
5196                Arc::new(right_plan.clone()),
5197                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5198                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5199                JoinType::Inner,
5200                JoinConstraint::On,
5201                false,
5202            )?;
5203
5204            let fields = join.schema.fields();
5205            assert_eq!(fields.len(), 6);
5206
5207            assert_eq!(
5208                fields[0].name(),
5209                "id",
5210                "First field should be 'id' from left table"
5211            );
5212            assert_eq!(
5213                fields[1].name(),
5214                "name",
5215                "Second field should be 'name' from left table"
5216            );
5217            assert_eq!(
5218                fields[2].name(),
5219                "value",
5220                "Third field should be 'value' from left table"
5221            );
5222            assert_eq!(
5223                fields[3].name(),
5224                "id",
5225                "Fourth field should be 'id' from right table"
5226            );
5227            assert_eq!(
5228                fields[4].name(),
5229                "category",
5230                "Fifth field should be 'category' from right table"
5231            );
5232            assert_eq!(
5233                fields[5].name(),
5234                "value",
5235                "Sixth field should be 'value' from right table"
5236            );
5237
5238            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5239        }
5240
5241        // Test 3: Join with null equality behavior set to true
5242        {
5243            let join = Join::try_new(
5244                Arc::new(left_plan.clone()),
5245                Arc::new(right_plan.clone()),
5246                vec![(col("t1.id"), col("t2.id"))],
5247                None,
5248                JoinType::Inner,
5249                JoinConstraint::On,
5250                true,
5251            )?;
5252
5253            assert!(join.null_equals_null);
5254        }
5255
5256        Ok(())
5257    }
5258
5259    #[test]
5260    fn test_join_try_new_schema_validation() -> Result<()> {
5261        let left_schema = Schema::new(vec![
5262            Field::new("id", DataType::Int32, false),
5263            Field::new("name", DataType::Utf8, false),
5264            Field::new("value", DataType::Float64, true),
5265        ]);
5266
5267        let right_schema = Schema::new(vec![
5268            Field::new("id", DataType::Int32, false),
5269            Field::new("category", DataType::Utf8, true),
5270            Field::new("code", DataType::Int16, false),
5271        ]);
5272
5273        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5274
5275        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5276
5277        let join_types = vec![
5278            JoinType::Inner,
5279            JoinType::Left,
5280            JoinType::Right,
5281            JoinType::Full,
5282        ];
5283
5284        for join_type in join_types {
5285            let join = Join::try_new(
5286                Arc::new(left_plan.clone()),
5287                Arc::new(right_plan.clone()),
5288                vec![(col("t1.id"), col("t2.id"))],
5289                Some(col("t1.value").gt(lit(5.0))),
5290                join_type,
5291                JoinConstraint::On,
5292                false,
5293            )?;
5294
5295            let fields = join.schema.fields();
5296            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5297
5298            for (i, field) in fields.iter().enumerate() {
5299                let expected_nullable = match (i, &join_type) {
5300                    // Left table fields (indices 0, 1, 2)
5301                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5302                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5303                    (2, _) => true, // value is already nullable
5304
5305                    // Right table fields (indices 3, 4, 5)
5306                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5307                    (4, _) => true, // category is already nullable
5308                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5309
5310                    _ => false,
5311                };
5312
5313                assert_eq!(
5314                    field.is_nullable(),
5315                    expected_nullable,
5316                    "Field {} ({}) nullability incorrect for {:?} join",
5317                    i,
5318                    field.name(),
5319                    join_type
5320                );
5321            }
5322        }
5323
5324        let using_join = Join::try_new(
5325            Arc::new(left_plan.clone()),
5326            Arc::new(right_plan.clone()),
5327            vec![(col("t1.id"), col("t2.id"))],
5328            None,
5329            JoinType::Inner,
5330            JoinConstraint::Using,
5331            false,
5332        )?;
5333
5334        assert_eq!(
5335            using_join.schema.fields().len(),
5336            6,
5337            "USING join should have all fields"
5338        );
5339        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5340
5341        Ok(())
5342    }
5343}