Skip to main content

datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29    InvariantLevel, assert_always_invariants_at_current_node,
30    assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35    intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38    NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, merge_schema, split_conjunction,
46};
47use crate::{
48    BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, GroupingSet,
49    LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50    WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62    FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63    ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207    /// Evaluates an arbitrary list of expressions (essentially a
208    /// SELECT with an expression list) on its input.
209    Projection(Projection),
210    /// Filters rows from its input that do not match an
211    /// expression (essentially a WHERE clause with a predicate
212    /// expression).
213    ///
214    /// Semantically, `<predicate>` is evaluated for each row of the
215    /// input; If the value of `<predicate>` is true, the input row is
216    /// passed to the output. If the value of `<predicate>` is false
217    /// (or null), the row is discarded.
218    Filter(Filter),
219    /// Windows input based on a set of window spec and window
220    /// function (e.g. SUM or RANK).  This is used to implement SQL
221    /// window functions, and the `OVER` clause.
222    ///
223    /// See [`Window`] for more details
224    Window(Window),
225    /// Aggregates its input based on a set of grouping and aggregate
226    /// expressions (e.g. SUM). This is used to implement SQL aggregates
227    /// and `GROUP BY`.
228    ///
229    /// See [`Aggregate`] for more details
230    Aggregate(Aggregate),
231    /// Sorts its input according to a list of sort expressions. This
232    /// is used to implement SQL `ORDER BY`
233    Sort(Sort),
234    /// Join two logical plans on one or more join columns.
235    /// This is used to implement SQL `JOIN`
236    Join(Join),
237    /// Repartitions the input based on a partitioning scheme. This is
238    /// used to add parallelism and is sometimes referred to as an
239    /// "exchange" operator in other systems
240    Repartition(Repartition),
241    /// Union multiple inputs with the same schema into a single
242    /// output stream. This is used to implement SQL `UNION [ALL]` and
243    /// `INTERSECT [ALL]`.
244    Union(Union),
245    /// Produces rows from a [`TableSource`], used to implement SQL
246    /// `FROM` tables or views.
247    TableScan(TableScan),
248    /// Produces no rows: An empty relation with an empty schema that
249    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
250    /// that has no values in the `FROM` clause.
251    EmptyRelation(EmptyRelation),
252    /// Produces the output of running another query.  This is used to
253    /// implement SQL subqueries
254    Subquery(Subquery),
255    /// Aliased relation provides, or changes, the name of a relation.
256    SubqueryAlias(SubqueryAlias),
257    /// Skip some number of rows, and then fetch some number of rows.
258    Limit(Limit),
259    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
260    Statement(Statement),
261    /// Values expression. See
262    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
263    /// documentation for more details. This is used to implement SQL such as
264    /// `VALUES (1, 2), (3, 4)`
265    Values(Values),
266    /// Produces a relation with string representations of
267    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
268    Explain(Explain),
269    /// Runs the input, and prints annotated physical plan as a string
270    /// with execution metric. This is used to implement SQL
271    /// `EXPLAIN ANALYZE`.
272    Analyze(Analyze),
273    /// Extension operator defined outside of DataFusion. This is used
274    /// to extend DataFusion with custom relational operations that
275    Extension(Extension),
276    /// Remove duplicate rows from the input. This is used to
277    /// implement SQL `SELECT DISTINCT ...`.
278    Distinct(Distinct),
279    /// Data Manipulation Language (DML): Insert / Update / Delete
280    Dml(DmlStatement),
281    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
282    Ddl(DdlStatement),
283    /// `COPY TO` for writing plan results to files
284    Copy(CopyTo),
285    /// Describe the schema of the table. This is used to implement the
286    /// SQL `DESCRIBE` command from MySQL.
287    DescribeTable(DescribeTable),
288    /// Unnest a column that contains a nested list type such as an
289    /// ARRAY. This is used to implement SQL `UNNEST`
290    Unnest(Unnest),
291    /// A variadic query (e.g. "Recursive CTEs")
292    RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296    fn default() -> Self {
297        // `Default` is used as a transient placeholder on hot paths (e.g.
298        // `Box`/`Arc` `map_elements`), so use a shared empty schema to avoid
299        // allocating.
300        LogicalPlan::EmptyRelation(EmptyRelation {
301            produce_one_row: false,
302            schema: Arc::clone(DFSchema::empty_ref()),
303        })
304    }
305}
306
307impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
308    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
309        &'a self,
310        mut f: F,
311    ) -> Result<TreeNodeRecursion> {
312        f(self)
313    }
314
315    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
316        self,
317        mut f: F,
318    ) -> Result<Transformed<Self>> {
319        f(self)
320    }
321}
322
323impl LogicalPlan {
324    /// Get a reference to the logical plan's schema
325    pub fn schema(&self) -> &DFSchemaRef {
326        match self {
327            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
328            LogicalPlan::Values(Values { schema, .. }) => schema,
329            LogicalPlan::TableScan(TableScan {
330                projected_schema, ..
331            }) => projected_schema,
332            LogicalPlan::Projection(Projection { schema, .. }) => schema,
333            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
334            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
335            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
336            LogicalPlan::Window(Window { schema, .. }) => schema,
337            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
338            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
339            LogicalPlan::Join(Join { schema, .. }) => schema,
340            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
341            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
342            LogicalPlan::Statement(statement) => statement.schema(),
343            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
344            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
345            LogicalPlan::Explain(explain) => &explain.schema,
346            LogicalPlan::Analyze(analyze) => &analyze.schema,
347            LogicalPlan::Extension(extension) => extension.node.schema(),
348            LogicalPlan::Union(Union { schema, .. }) => schema,
349            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
350                output_schema
351            }
352            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
353            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
354            LogicalPlan::Ddl(ddl) => ddl.schema(),
355            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
356            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
357                // we take the schema of the static term as the schema of the entire recursive query
358                static_term.schema()
359            }
360        }
361    }
362
363    /// Used for normalizing columns, as the fallback schemas to the main schema
364    /// of the plan.
365    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
366        match self {
367            LogicalPlan::Window(_)
368            | LogicalPlan::Projection(_)
369            | LogicalPlan::Aggregate(_)
370            | LogicalPlan::Unnest(_)
371            | LogicalPlan::Join(_) => self
372                .inputs()
373                .iter()
374                .map(|input| input.schema().as_ref())
375                .collect(),
376            _ => vec![],
377        }
378    }
379
380    /// Returns the (fixed) output schema for explain plans
381    pub fn explain_schema() -> SchemaRef {
382        SchemaRef::new(Schema::new(vec![
383            Field::new("plan_type", DataType::Utf8, false),
384            Field::new("plan", DataType::Utf8, false),
385        ]))
386    }
387
388    /// Returns the (fixed) output schema for `DESCRIBE` plans
389    pub fn describe_schema() -> Schema {
390        Schema::new(vec![
391            Field::new("column_name", DataType::Utf8, false),
392            Field::new("data_type", DataType::Utf8, false),
393            Field::new("is_nullable", DataType::Utf8, false),
394        ])
395    }
396
397    /// Returns all expressions (non-recursively) evaluated by the current
398    /// logical plan node. This does not include expressions in any children.
399    ///
400    /// Note this method `clone`s all the expressions. When possible, the
401    /// [`tree_node`] API should be used instead of this API.
402    ///
403    /// The returned expressions do not necessarily represent or even
404    /// contributed to the output schema of this node. For example,
405    /// `LogicalPlan::Filter` returns the filter expression even though the
406    /// output of a Filter has the same columns as the input.
407    ///
408    /// The expressions do contain all the columns that are used by this plan,
409    /// so if there are columns not referenced by these expressions then
410    /// DataFusion's optimizer attempts to optimize them away.
411    ///
412    /// [`tree_node`]: crate::logical_plan::tree_node
413    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
414        let mut exprs = vec![];
415        self.apply_expressions(|e| {
416            exprs.push(e.clone());
417            Ok(TreeNodeRecursion::Continue)
418        })
419        // closure always returns OK
420        .unwrap();
421        exprs
422    }
423
424    /// Returns all the out reference(correlated) expressions (recursively) in the current
425    /// logical plan nodes and all its descendant nodes.
426    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
427        let mut exprs = vec![];
428        self.apply_expressions(|e| {
429            find_out_reference_exprs(e).into_iter().for_each(|e| {
430                if !exprs.contains(&e) {
431                    exprs.push(e)
432                }
433            });
434            Ok(TreeNodeRecursion::Continue)
435        })
436        // closure always returns OK
437        .unwrap();
438        self.inputs()
439            .into_iter()
440            .flat_map(|child| child.all_out_ref_exprs())
441            .for_each(|e| {
442                if !exprs.contains(&e) {
443                    exprs.push(e)
444                }
445            });
446        exprs
447    }
448
449    /// Returns all inputs / children of this `LogicalPlan` node.
450    ///
451    /// Note does not include inputs to inputs, or subqueries.
452    pub fn inputs(&self) -> Vec<&LogicalPlan> {
453        match self {
454            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
455            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
456            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
457            LogicalPlan::Window(Window { input, .. }) => vec![input],
458            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
459            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
460            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
461            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
462            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
463            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
464            LogicalPlan::Extension(extension) => extension.node.inputs(),
465            LogicalPlan::Union(Union { inputs, .. }) => {
466                inputs.iter().map(|arc| arc.as_ref()).collect()
467            }
468            LogicalPlan::Distinct(
469                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
470            ) => vec![input],
471            LogicalPlan::Explain(explain) => vec![&explain.plan],
472            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
473            LogicalPlan::Dml(write) => vec![&write.input],
474            LogicalPlan::Copy(copy) => vec![&copy.input],
475            LogicalPlan::Ddl(ddl) => ddl.inputs(),
476            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
477            LogicalPlan::RecursiveQuery(RecursiveQuery {
478                static_term,
479                recursive_term,
480                ..
481            }) => vec![static_term, recursive_term],
482            LogicalPlan::Statement(stmt) => stmt.inputs(),
483            // plans without inputs
484            LogicalPlan::TableScan { .. }
485            | LogicalPlan::EmptyRelation { .. }
486            | LogicalPlan::Values { .. }
487            | LogicalPlan::DescribeTable(_) => vec![],
488        }
489    }
490
491    /// returns all `Using` join columns in a logical plan
492    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
493        let mut using_columns: Vec<HashSet<Column>> = vec![];
494
495        self.apply_with_subqueries(|plan| {
496            if let LogicalPlan::Join(Join {
497                join_constraint: JoinConstraint::Using,
498                on,
499                ..
500            }) = plan
501            {
502                // The join keys in using-join must be columns.
503                let columns =
504                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
505                        let Some(l) = l.get_as_join_column() else {
506                            return internal_err!(
507                                "Invalid join key. Expected column, found {l:?}"
508                            );
509                        };
510                        let Some(r) = r.get_as_join_column() else {
511                            return internal_err!(
512                                "Invalid join key. Expected column, found {r:?}"
513                            );
514                        };
515                        accumu.insert(l.to_owned());
516                        accumu.insert(r.to_owned());
517                        Result::<_, DataFusionError>::Ok(accumu)
518                    })?;
519                using_columns.push(columns);
520            }
521            Ok(TreeNodeRecursion::Continue)
522        })?;
523
524        Ok(using_columns)
525    }
526
527    /// returns the first output expression of this `LogicalPlan` node.
528    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
529        match self {
530            LogicalPlan::Projection(projection) => {
531                Ok(Some(projection.expr.as_slice()[0].clone()))
532            }
533            LogicalPlan::Aggregate(agg) => {
534                if agg.group_expr.is_empty() {
535                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
536                } else {
537                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
538                }
539            }
540            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
541                Ok(Some(select_expr[0].clone()))
542            }
543            LogicalPlan::Filter(Filter { input, .. })
544            | LogicalPlan::Distinct(Distinct::All(input))
545            | LogicalPlan::Sort(Sort { input, .. })
546            | LogicalPlan::Limit(Limit { input, .. })
547            | LogicalPlan::Repartition(Repartition { input, .. })
548            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
549            LogicalPlan::Join(Join {
550                left,
551                right,
552                join_type,
553                ..
554            }) => match join_type {
555                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
556                    if left.schema().fields().is_empty() {
557                        right.head_output_expr()
558                    } else {
559                        left.head_output_expr()
560                    }
561                }
562                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
563                    left.head_output_expr()
564                }
565                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
566                    right.head_output_expr()
567                }
568            },
569            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
570                static_term.head_output_expr()
571            }
572            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
573                union.schema.qualified_field(0),
574            )))),
575            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
576                table.projected_schema.qualified_field(0),
577            )))),
578            LogicalPlan::SubqueryAlias(subquery_alias) => {
579                let expr_opt = subquery_alias.input.head_output_expr()?;
580                expr_opt
581                    .map(|expr| {
582                        Ok(Expr::Column(create_col_from_scalar_expr(
583                            &expr,
584                            subquery_alias.alias.to_string(),
585                        )?))
586                    })
587                    .map_or(Ok(None), |v| v.map(Some))
588            }
589            LogicalPlan::Subquery(_) => Ok(None),
590            LogicalPlan::EmptyRelation(_)
591            | LogicalPlan::Statement(_)
592            | LogicalPlan::Values(_)
593            | LogicalPlan::Explain(_)
594            | LogicalPlan::Analyze(_)
595            | LogicalPlan::Extension(_)
596            | LogicalPlan::Dml(_)
597            | LogicalPlan::Copy(_)
598            | LogicalPlan::Ddl(_)
599            | LogicalPlan::DescribeTable(_)
600            | LogicalPlan::Unnest(_) => Ok(None),
601        }
602    }
603
604    /// Recomputes schema and type information for this LogicalPlan if needed.
605    ///
606    /// Some `LogicalPlan`s may need to recompute their schema if the number or
607    /// type of expressions have been changed (for example due to type
608    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
609    /// its expressions.
610    ///
611    /// Some `LogicalPlan`s schema is unaffected by any changes to their
612    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
613    /// same as its input schema.
614    ///
615    /// This is useful after modifying a plans `Expr`s (or input plans) via
616    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
617    /// [Self::with_new_exprs], this method does not require a new set of
618    /// expressions or inputs plans.
619    ///
620    /// # Return value
621    /// Returns an error if there is some issue recomputing the schema.
622    ///
623    /// # Notes
624    ///
625    /// * Does not recursively recompute schema for input (child) plans.
626    pub fn recompute_schema(self) -> Result<Self> {
627        match self {
628            // Since expr may be different than the previous expr, schema of the projection
629            // may change. We need to use try_new method instead of try_new_with_schema method.
630            LogicalPlan::Projection(Projection {
631                expr,
632                input,
633                schema: _,
634            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
635            LogicalPlan::Dml(_) => Ok(self),
636            LogicalPlan::Copy(_) => Ok(self),
637            LogicalPlan::Values(Values { schema, values }) => {
638                // todo it isn't clear why the schema is not recomputed here
639                Ok(LogicalPlan::Values(Values { schema, values }))
640            }
641            LogicalPlan::Filter(Filter { predicate, input }) => {
642                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
643            }
644            LogicalPlan::Repartition(_) => Ok(self),
645            LogicalPlan::Window(Window {
646                input,
647                window_expr,
648                schema: _,
649            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
650            LogicalPlan::Aggregate(Aggregate {
651                input,
652                group_expr,
653                aggr_expr,
654                schema: _,
655            }) => Aggregate::try_new(input, group_expr, aggr_expr)
656                .map(LogicalPlan::Aggregate),
657            LogicalPlan::Sort(_) => Ok(self),
658            LogicalPlan::Join(Join {
659                left,
660                right,
661                filter,
662                join_type,
663                join_constraint,
664                on,
665                schema: _,
666                null_equality,
667                null_aware,
668            }) => {
669                let schema =
670                    build_join_schema(left.schema(), right.schema(), &join_type)?;
671
672                let new_on: Vec<_> = on
673                    .into_iter()
674                    .map(|equi_expr| {
675                        // SimplifyExpression rule may add alias to the equi_expr.
676                        (equi_expr.0.unalias(), equi_expr.1.unalias())
677                    })
678                    .collect();
679
680                Ok(LogicalPlan::Join(Join {
681                    left,
682                    right,
683                    join_type,
684                    join_constraint,
685                    on: new_on,
686                    filter,
687                    schema: DFSchemaRef::new(schema),
688                    null_equality,
689                    null_aware,
690                }))
691            }
692            LogicalPlan::Subquery(_) => Ok(self),
693            LogicalPlan::SubqueryAlias(SubqueryAlias {
694                input,
695                alias,
696                schema: _,
697            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
698            LogicalPlan::Limit(_) => Ok(self),
699            LogicalPlan::Ddl(_) => Ok(self),
700            LogicalPlan::Extension(Extension { node }) => {
701                // todo make an API that does not require cloning
702                // This requires a copy of the extension nodes expressions and inputs
703                let expr = node.expressions();
704                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
705                Ok(LogicalPlan::Extension(Extension {
706                    node: node.with_exprs_and_inputs(expr, inputs)?,
707                }))
708            }
709            LogicalPlan::Union(Union { inputs, schema }) => {
710                let first_input_schema = inputs[0].schema();
711                if schema.fields().len() == first_input_schema.fields().len() {
712                    // If inputs are not pruned do not change schema
713                    Ok(LogicalPlan::Union(Union { inputs, schema }))
714                } else {
715                    // A note on `Union`s constructed via `try_new_by_name`:
716                    //
717                    // At this point, the schema for each input should have
718                    // the same width. Thus, we do not need to save whether a
719                    // `Union` was created `BY NAME`, and can safely rely on the
720                    // `try_new` initializer to derive the new schema based on
721                    // column positions.
722                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
723                }
724            }
725            LogicalPlan::Distinct(distinct) => {
726                let distinct = match distinct {
727                    Distinct::All(input) => Distinct::All(input),
728                    Distinct::On(DistinctOn {
729                        on_expr,
730                        select_expr,
731                        sort_expr,
732                        input,
733                        schema: _,
734                    }) => Distinct::On(DistinctOn::try_new(
735                        on_expr,
736                        select_expr,
737                        sort_expr,
738                        input,
739                    )?),
740                };
741                Ok(LogicalPlan::Distinct(distinct))
742            }
743            LogicalPlan::RecursiveQuery(_) => Ok(self),
744            LogicalPlan::Analyze(_) => Ok(self),
745            LogicalPlan::Explain(_) => Ok(self),
746            LogicalPlan::TableScan(_) => Ok(self),
747            LogicalPlan::EmptyRelation(_) => Ok(self),
748            LogicalPlan::Statement(_) => Ok(self),
749            LogicalPlan::DescribeTable(_) => Ok(self),
750            LogicalPlan::Unnest(Unnest {
751                input,
752                exec_columns,
753                options,
754                ..
755            }) => {
756                // Update schema with unnested column type.
757                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
758            }
759        }
760    }
761
762    /// Returns a new `LogicalPlan` based on `self` with inputs and
763    /// expressions replaced.
764    ///
765    /// Note this method creates an entirely new node, which requires a large
766    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
767    /// instead of this API.
768    ///
769    /// The exprs correspond to the same order of expressions returned
770    /// by [`Self::expressions`]. This function is used by optimizers
771    /// to rewrite plans using the following pattern:
772    ///
773    /// [`tree_node`]: crate::logical_plan::tree_node
774    ///
775    /// ```text
776    /// let new_inputs = optimize_children(..., plan, props);
777    ///
778    /// // get the plans expressions to optimize
779    /// let exprs = plan.expressions();
780    ///
781    /// // potentially rewrite plan expressions
782    /// let rewritten_exprs = rewrite_exprs(exprs);
783    ///
784    /// // create new plan using rewritten_exprs in same position
785    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
786    /// ```
787    pub fn with_new_exprs(
788        &self,
789        mut expr: Vec<Expr>,
790        inputs: Vec<LogicalPlan>,
791    ) -> Result<LogicalPlan> {
792        match self {
793            // Since expr may be different than the previous expr, schema of the projection
794            // may change. We need to use try_new method instead of try_new_with_schema method.
795            LogicalPlan::Projection(Projection { .. }) => {
796                let input = self.only_input(inputs)?;
797                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
798            }
799            LogicalPlan::Dml(DmlStatement {
800                table_name,
801                target,
802                op,
803                ..
804            }) => {
805                self.assert_no_expressions(expr)?;
806                let input = self.only_input(inputs)?;
807                Ok(LogicalPlan::Dml(DmlStatement::new(
808                    table_name.clone(),
809                    Arc::clone(target),
810                    op.clone(),
811                    Arc::new(input),
812                )))
813            }
814            LogicalPlan::Copy(CopyTo {
815                input: _,
816                output_url,
817                file_type,
818                options,
819                partition_by,
820                output_schema: _,
821            }) => {
822                self.assert_no_expressions(expr)?;
823                let input = self.only_input(inputs)?;
824                Ok(LogicalPlan::Copy(CopyTo::new(
825                    Arc::new(input),
826                    output_url.clone(),
827                    partition_by.clone(),
828                    Arc::clone(file_type),
829                    options.clone(),
830                )))
831            }
832            LogicalPlan::Values(Values { schema, .. }) => {
833                self.assert_no_inputs(inputs)?;
834                Ok(LogicalPlan::Values(Values {
835                    schema: Arc::clone(schema),
836                    values: expr
837                        .chunks_exact(schema.fields().len())
838                        .map(|s| s.to_vec())
839                        .collect(),
840                }))
841            }
842            LogicalPlan::Filter { .. } => {
843                let predicate = self.only_expr(expr)?;
844                let input = self.only_input(inputs)?;
845
846                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
847            }
848            LogicalPlan::Repartition(Repartition {
849                partitioning_scheme,
850                ..
851            }) => match partitioning_scheme {
852                Partitioning::RoundRobinBatch(n) => {
853                    self.assert_no_expressions(expr)?;
854                    let input = self.only_input(inputs)?;
855                    Ok(LogicalPlan::Repartition(Repartition {
856                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
857                        input: Arc::new(input),
858                    }))
859                }
860                Partitioning::Hash(_, n) => {
861                    let input = self.only_input(inputs)?;
862                    Ok(LogicalPlan::Repartition(Repartition {
863                        partitioning_scheme: Partitioning::Hash(expr, *n),
864                        input: Arc::new(input),
865                    }))
866                }
867                Partitioning::DistributeBy(_) => {
868                    let input = self.only_input(inputs)?;
869                    Ok(LogicalPlan::Repartition(Repartition {
870                        partitioning_scheme: Partitioning::DistributeBy(expr),
871                        input: Arc::new(input),
872                    }))
873                }
874            },
875            LogicalPlan::Window(Window { window_expr, .. }) => {
876                assert_eq!(window_expr.len(), expr.len());
877                let input = self.only_input(inputs)?;
878                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
879            }
880            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
881                let input = self.only_input(inputs)?;
882                // group exprs are the first expressions
883                let agg_expr = expr.split_off(group_expr.len());
884
885                Aggregate::try_new(Arc::new(input), expr, agg_expr)
886                    .map(LogicalPlan::Aggregate)
887            }
888            LogicalPlan::Sort(Sort {
889                expr: sort_expr,
890                fetch,
891                ..
892            }) => {
893                let input = self.only_input(inputs)?;
894                Ok(LogicalPlan::Sort(Sort {
895                    expr: expr
896                        .into_iter()
897                        .zip(sort_expr.iter())
898                        .map(|(expr, sort)| sort.with_expr(expr))
899                        .collect(),
900                    input: Arc::new(input),
901                    fetch: *fetch,
902                }))
903            }
904            LogicalPlan::Join(Join {
905                join_type,
906                join_constraint,
907                on,
908                null_equality,
909                null_aware,
910                ..
911            }) => {
912                let (left, right) = self.only_two_inputs(inputs)?;
913                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
914
915                let equi_expr_count = on.len() * 2;
916                assert!(expr.len() >= equi_expr_count);
917
918                // Assume that the last expr, if any,
919                // is the filter_expr (non equality predicate from ON clause)
920                let filter_expr = if expr.len() > equi_expr_count {
921                    expr.pop()
922                } else {
923                    None
924                };
925
926                // The first part of expr is equi-exprs,
927                // and the struct of each equi-expr is like `left-expr = right-expr`.
928                assert_eq!(expr.len(), equi_expr_count);
929                let mut new_on = Vec::with_capacity(on.len());
930                let mut iter = expr.into_iter();
931                while let Some(left) = iter.next() {
932                    let Some(right) = iter.next() else {
933                        internal_err!(
934                            "Expected a pair of expressions to construct the join on expression"
935                        )?
936                    };
937
938                    // SimplifyExpression rule may add alias to the equi_expr.
939                    new_on.push((left.unalias(), right.unalias()));
940                }
941
942                Ok(LogicalPlan::Join(Join {
943                    left: Arc::new(left),
944                    right: Arc::new(right),
945                    join_type: *join_type,
946                    join_constraint: *join_constraint,
947                    on: new_on,
948                    filter: filter_expr,
949                    schema: DFSchemaRef::new(schema),
950                    null_equality: *null_equality,
951                    null_aware: *null_aware,
952                }))
953            }
954            LogicalPlan::Subquery(Subquery {
955                outer_ref_columns,
956                spans,
957                ..
958            }) => {
959                self.assert_no_expressions(expr)?;
960                let input = self.only_input(inputs)?;
961                let subquery = LogicalPlanBuilder::from(input).build()?;
962                Ok(LogicalPlan::Subquery(Subquery {
963                    subquery: Arc::new(subquery),
964                    outer_ref_columns: outer_ref_columns.clone(),
965                    spans: spans.clone(),
966                }))
967            }
968            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
969                self.assert_no_expressions(expr)?;
970                let input = self.only_input(inputs)?;
971                SubqueryAlias::try_new(Arc::new(input), alias.clone())
972                    .map(LogicalPlan::SubqueryAlias)
973            }
974            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
975                let old_expr_len = skip.iter().chain(fetch.iter()).count();
976                assert_eq_or_internal_err!(
977                    old_expr_len,
978                    expr.len(),
979                    "Invalid number of new Limit expressions: expected {}, got {}",
980                    old_expr_len,
981                    expr.len()
982                );
983                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
984                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
985                let new_skip = skip.as_ref().and_then(|_| expr.pop());
986                let input = self.only_input(inputs)?;
987                Ok(LogicalPlan::Limit(Limit {
988                    skip: new_skip.map(Box::new),
989                    fetch: new_fetch.map(Box::new),
990                    input: Arc::new(input),
991                }))
992            }
993            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
994                name,
995                if_not_exists,
996                or_replace,
997                column_defaults,
998                temporary,
999                ..
1000            })) => {
1001                self.assert_no_expressions(expr)?;
1002                let input = self.only_input(inputs)?;
1003                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1004                    CreateMemoryTable {
1005                        input: Arc::new(input),
1006                        constraints: Constraints::default(),
1007                        name: name.clone(),
1008                        if_not_exists: *if_not_exists,
1009                        or_replace: *or_replace,
1010                        column_defaults: column_defaults.clone(),
1011                        temporary: *temporary,
1012                    },
1013                )))
1014            }
1015            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1016                name,
1017                or_replace,
1018                definition,
1019                temporary,
1020                ..
1021            })) => {
1022                self.assert_no_expressions(expr)?;
1023                let input = self.only_input(inputs)?;
1024                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1025                    input: Arc::new(input),
1026                    name: name.clone(),
1027                    or_replace: *or_replace,
1028                    temporary: *temporary,
1029                    definition: definition.clone(),
1030                })))
1031            }
1032            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1033                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1034            })),
1035            LogicalPlan::Union(Union { schema, .. }) => {
1036                self.assert_no_expressions(expr)?;
1037                let input_schema = inputs[0].schema();
1038                // If inputs are not pruned do not change schema.
1039                let schema = if schema.fields().len() == input_schema.fields().len() {
1040                    Arc::clone(schema)
1041                } else {
1042                    Arc::clone(input_schema)
1043                };
1044                Ok(LogicalPlan::Union(Union {
1045                    inputs: inputs.into_iter().map(Arc::new).collect(),
1046                    schema,
1047                }))
1048            }
1049            LogicalPlan::Distinct(distinct) => {
1050                let distinct = match distinct {
1051                    Distinct::All(_) => {
1052                        self.assert_no_expressions(expr)?;
1053                        let input = self.only_input(inputs)?;
1054                        Distinct::All(Arc::new(input))
1055                    }
1056                    Distinct::On(DistinctOn {
1057                        on_expr,
1058                        select_expr,
1059                        ..
1060                    }) => {
1061                        let input = self.only_input(inputs)?;
1062                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1063                        let select_expr = expr.split_off(on_expr.len());
1064                        assert!(
1065                            sort_expr.is_empty(),
1066                            "with_new_exprs for Distinct does not support sort expressions"
1067                        );
1068                        Distinct::On(DistinctOn::try_new(
1069                            expr,
1070                            select_expr,
1071                            None, // no sort expressions accepted
1072                            Arc::new(input),
1073                        )?)
1074                    }
1075                };
1076                Ok(LogicalPlan::Distinct(distinct))
1077            }
1078            LogicalPlan::RecursiveQuery(RecursiveQuery {
1079                name, is_distinct, ..
1080            }) => {
1081                self.assert_no_expressions(expr)?;
1082                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1083                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1084                    name: name.clone(),
1085                    static_term: Arc::new(static_term),
1086                    recursive_term: Arc::new(recursive_term),
1087                    is_distinct: *is_distinct,
1088                }))
1089            }
1090            LogicalPlan::Analyze(a) => {
1091                self.assert_no_expressions(expr)?;
1092                let input = self.only_input(inputs)?;
1093                Ok(LogicalPlan::Analyze(Analyze {
1094                    verbose: a.verbose,
1095                    schema: Arc::clone(&a.schema),
1096                    input: Arc::new(input),
1097                }))
1098            }
1099            LogicalPlan::Explain(e) => {
1100                self.assert_no_expressions(expr)?;
1101                let input = self.only_input(inputs)?;
1102                Ok(LogicalPlan::Explain(Explain {
1103                    verbose: e.verbose,
1104                    plan: Arc::new(input),
1105                    explain_format: e.explain_format.clone(),
1106                    stringified_plans: e.stringified_plans.clone(),
1107                    schema: Arc::clone(&e.schema),
1108                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1109                }))
1110            }
1111            LogicalPlan::Statement(Statement::Prepare(Prepare {
1112                name, fields, ..
1113            })) => {
1114                self.assert_no_expressions(expr)?;
1115                let input = self.only_input(inputs)?;
1116                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1117                    name: name.clone(),
1118                    fields: fields.clone(),
1119                    input: Arc::new(input),
1120                })))
1121            }
1122            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1123                self.assert_no_inputs(inputs)?;
1124                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1125                    name: name.clone(),
1126                    parameters: expr,
1127                })))
1128            }
1129            LogicalPlan::TableScan(ts) => {
1130                self.assert_no_inputs(inputs)?;
1131                Ok(LogicalPlan::TableScan(TableScan {
1132                    filters: expr,
1133                    ..ts.clone()
1134                }))
1135            }
1136            LogicalPlan::EmptyRelation(_)
1137            | LogicalPlan::Ddl(_)
1138            | LogicalPlan::Statement(_)
1139            | LogicalPlan::DescribeTable(_) => {
1140                // All of these plan types have no inputs / exprs so should not be called
1141                self.assert_no_expressions(expr)?;
1142                self.assert_no_inputs(inputs)?;
1143                Ok(self.clone())
1144            }
1145            LogicalPlan::Unnest(Unnest {
1146                exec_columns: columns,
1147                options,
1148                ..
1149            }) => {
1150                self.assert_no_expressions(expr)?;
1151                let input = self.only_input(inputs)?;
1152                // Update schema with unnested column type.
1153                let new_plan =
1154                    unnest_with_options(input, columns.clone(), options.clone())?;
1155                Ok(new_plan)
1156            }
1157        }
1158    }
1159
1160    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1161    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1162        match check {
1163            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1164            InvariantLevel::Executable => assert_executable_invariants(self),
1165        }
1166    }
1167
1168    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1169    #[inline]
1170    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1171    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1172        assert_or_internal_err!(
1173            expr.is_empty(),
1174            "{self:?} should have no exprs, got {:?}",
1175            expr
1176        );
1177        Ok(())
1178    }
1179
1180    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1181    #[inline]
1182    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1183    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1184        assert_or_internal_err!(
1185            inputs.is_empty(),
1186            "{self:?} should have no inputs, got: {:?}",
1187            inputs
1188        );
1189        Ok(())
1190    }
1191
1192    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1193    #[inline]
1194    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1195        assert_eq_or_internal_err!(
1196            expr.len(),
1197            1,
1198            "{self:?} should have exactly one expr, got {:?}",
1199            &expr
1200        );
1201        Ok(expr.remove(0))
1202    }
1203
1204    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1205    #[inline]
1206    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1207        assert_eq_or_internal_err!(
1208            inputs.len(),
1209            1,
1210            "{self:?} should have exactly one input, got {:?}",
1211            &inputs
1212        );
1213        Ok(inputs.remove(0))
1214    }
1215
1216    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1217    #[inline]
1218    fn only_two_inputs(
1219        &self,
1220        mut inputs: Vec<LogicalPlan>,
1221    ) -> Result<(LogicalPlan, LogicalPlan)> {
1222        assert_eq_or_internal_err!(
1223            inputs.len(),
1224            2,
1225            "{self:?} should have exactly two inputs, got {:?}",
1226            &inputs
1227        );
1228        let right = inputs.remove(1);
1229        let left = inputs.remove(0);
1230        Ok((left, right))
1231    }
1232
1233    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1234    /// with the specified `param_values`.
1235    ///
1236    /// [`Prepare`] statements are converted to
1237    /// their inner logical plan for execution.
1238    ///
1239    /// # Example
1240    /// ```
1241    /// # use arrow::datatypes::{Field, Schema, DataType};
1242    /// use datafusion_common::ScalarValue;
1243    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1244    /// # let schema = Schema::new(vec![
1245    /// #     Field::new("id", DataType::Int32, false),
1246    /// # ]);
1247    /// // Build SELECT * FROM t1 WHERE id = $1
1248    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1249    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1250    ///     .build().unwrap();
1251    ///
1252    /// assert_eq!(
1253    ///   "Filter: t1.id = $1\
1254    ///   \n  TableScan: t1",
1255    ///   plan.display_indent().to_string()
1256    /// );
1257    ///
1258    /// // Fill in the parameter $1 with a literal 3
1259    /// let plan = plan.with_param_values(vec![
1260    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1261    /// ]).unwrap();
1262    ///
1263    /// assert_eq!(
1264    ///    "Filter: t1.id = Int32(3)\
1265    ///    \n  TableScan: t1",
1266    ///    plan.display_indent().to_string()
1267    ///  );
1268    ///
1269    /// // Note you can also used named parameters
1270    /// // Build SELECT * FROM t1 WHERE id = $my_param
1271    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1272    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1273    ///     .build().unwrap()
1274    ///     // Fill in the parameter $my_param with a literal 3
1275    ///     .with_param_values(vec![
1276    ///       ("my_param", ScalarValue::from(3i32)),
1277    ///     ]).unwrap();
1278    ///
1279    /// assert_eq!(
1280    ///    "Filter: t1.id = Int32(3)\
1281    ///    \n  TableScan: t1",
1282    ///    plan.display_indent().to_string()
1283    ///  );
1284    /// ```
1285    pub fn with_param_values(
1286        self,
1287        param_values: impl Into<ParamValues>,
1288    ) -> Result<LogicalPlan> {
1289        let param_values = param_values.into();
1290        let plan_with_values = self.replace_params_with_values(&param_values)?;
1291
1292        // unwrap Prepare
1293        Ok(
1294            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1295                plan_with_values
1296            {
1297                param_values.verify_fields(&prepare_lp.fields)?;
1298                // try and take ownership of the input if is not shared, clone otherwise
1299                Arc::unwrap_or_clone(prepare_lp.input)
1300            } else {
1301                plan_with_values
1302            },
1303        )
1304    }
1305
1306    /// Returns the maximum number of rows that this plan can output, if known.
1307    ///
1308    /// If `None`, the plan can return any number of rows.
1309    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1310    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1311        match self {
1312            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1313            LogicalPlan::Filter(filter) => {
1314                if filter.is_scalar() {
1315                    Some(1)
1316                } else {
1317                    filter.input.max_rows()
1318                }
1319            }
1320            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1321            LogicalPlan::Aggregate(Aggregate {
1322                input, group_expr, ..
1323            }) => {
1324                // Empty group_expr will return Some(1)
1325                if group_expr
1326                    .iter()
1327                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1328                {
1329                    Some(1)
1330                } else {
1331                    input.max_rows()
1332                }
1333            }
1334            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1335                match (fetch, input.max_rows()) {
1336                    (Some(fetch_limit), Some(input_max)) => {
1337                        Some(input_max.min(*fetch_limit))
1338                    }
1339                    (Some(fetch_limit), None) => Some(*fetch_limit),
1340                    (None, Some(input_max)) => Some(input_max),
1341                    (None, None) => None,
1342                }
1343            }
1344            LogicalPlan::Join(Join {
1345                left,
1346                right,
1347                join_type,
1348                ..
1349            }) => match join_type {
1350                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1351                JoinType::Left | JoinType::Right | JoinType::Full => {
1352                    match (left.max_rows()?, right.max_rows()?, join_type) {
1353                        (0, 0, _) => Some(0),
1354                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1355                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1356                        (left_max, right_max, _) => Some(left_max * right_max),
1357                    }
1358                }
1359                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1360                    left.max_rows()
1361                }
1362                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1363                    right.max_rows()
1364                }
1365            },
1366            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1367            LogicalPlan::Union(Union { inputs, .. }) => {
1368                inputs.iter().try_fold(0usize, |mut acc, plan| {
1369                    acc += plan.max_rows()?;
1370                    Some(acc)
1371                })
1372            }
1373            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1374            LogicalPlan::EmptyRelation(_) => Some(0),
1375            LogicalPlan::RecursiveQuery(_) => None,
1376            LogicalPlan::Subquery(_) => None,
1377            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1378            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1379                Ok(FetchType::Literal(s)) => s,
1380                _ => None,
1381            },
1382            LogicalPlan::Distinct(
1383                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1384            ) => input.max_rows(),
1385            LogicalPlan::Values(v) => Some(v.values.len()),
1386            LogicalPlan::Unnest(_) => None,
1387            LogicalPlan::Ddl(_)
1388            | LogicalPlan::Explain(_)
1389            | LogicalPlan::Analyze(_)
1390            | LogicalPlan::Dml(_)
1391            | LogicalPlan::Copy(_)
1392            | LogicalPlan::DescribeTable(_)
1393            | LogicalPlan::Statement(_)
1394            | LogicalPlan::Extension(_) => None,
1395        }
1396    }
1397
1398    /// Returns the skip (offset) of this plan node, if it has one.
1399    ///
1400    /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1401    /// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1402    pub fn skip(&self) -> Result<Option<usize>> {
1403        match self {
1404            LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1405                SkipType::Literal(0) => Ok(None),
1406                SkipType::Literal(n) => Ok(Some(n)),
1407                SkipType::UnsupportedExpr => Ok(None),
1408            },
1409            LogicalPlan::Sort(_) => Ok(None),
1410            LogicalPlan::TableScan(_) => Ok(None),
1411            LogicalPlan::Projection(_) => Ok(None),
1412            LogicalPlan::Filter(_) => Ok(None),
1413            LogicalPlan::Window(_) => Ok(None),
1414            LogicalPlan::Aggregate(_) => Ok(None),
1415            LogicalPlan::Join(_) => Ok(None),
1416            LogicalPlan::Repartition(_) => Ok(None),
1417            LogicalPlan::Union(_) => Ok(None),
1418            LogicalPlan::EmptyRelation(_) => Ok(None),
1419            LogicalPlan::Subquery(_) => Ok(None),
1420            LogicalPlan::SubqueryAlias(_) => Ok(None),
1421            LogicalPlan::Statement(_) => Ok(None),
1422            LogicalPlan::Values(_) => Ok(None),
1423            LogicalPlan::Explain(_) => Ok(None),
1424            LogicalPlan::Analyze(_) => Ok(None),
1425            LogicalPlan::Extension(_) => Ok(None),
1426            LogicalPlan::Distinct(_) => Ok(None),
1427            LogicalPlan::Dml(_) => Ok(None),
1428            LogicalPlan::Ddl(_) => Ok(None),
1429            LogicalPlan::Copy(_) => Ok(None),
1430            LogicalPlan::DescribeTable(_) => Ok(None),
1431            LogicalPlan::Unnest(_) => Ok(None),
1432            LogicalPlan::RecursiveQuery(_) => Ok(None),
1433        }
1434    }
1435
1436    /// Returns the fetch (limit) of this plan node, if it has one.
1437    ///
1438    /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1439    /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1440    /// return `Ok(None)`.
1441    pub fn fetch(&self) -> Result<Option<usize>> {
1442        match self {
1443            LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1444            LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1445            LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1446                FetchType::Literal(s) => Ok(s),
1447                FetchType::UnsupportedExpr => Ok(None),
1448            },
1449            LogicalPlan::Projection(_) => Ok(None),
1450            LogicalPlan::Filter(_) => Ok(None),
1451            LogicalPlan::Window(_) => Ok(None),
1452            LogicalPlan::Aggregate(_) => Ok(None),
1453            LogicalPlan::Join(_) => Ok(None),
1454            LogicalPlan::Repartition(_) => Ok(None),
1455            LogicalPlan::Union(_) => Ok(None),
1456            LogicalPlan::EmptyRelation(_) => Ok(None),
1457            LogicalPlan::Subquery(_) => Ok(None),
1458            LogicalPlan::SubqueryAlias(_) => Ok(None),
1459            LogicalPlan::Statement(_) => Ok(None),
1460            LogicalPlan::Values(_) => Ok(None),
1461            LogicalPlan::Explain(_) => Ok(None),
1462            LogicalPlan::Analyze(_) => Ok(None),
1463            LogicalPlan::Extension(_) => Ok(None),
1464            LogicalPlan::Distinct(_) => Ok(None),
1465            LogicalPlan::Dml(_) => Ok(None),
1466            LogicalPlan::Ddl(_) => Ok(None),
1467            LogicalPlan::Copy(_) => Ok(None),
1468            LogicalPlan::DescribeTable(_) => Ok(None),
1469            LogicalPlan::Unnest(_) => Ok(None),
1470            LogicalPlan::RecursiveQuery(_) => Ok(None),
1471        }
1472    }
1473
1474    /// If this node's expressions contains any references to an outer subquery
1475    pub fn contains_outer_reference(&self) -> bool {
1476        let mut contains = false;
1477        self.apply_expressions(|expr| {
1478            Ok(if expr.contains_outer() {
1479                contains = true;
1480                TreeNodeRecursion::Stop
1481            } else {
1482                TreeNodeRecursion::Continue
1483            })
1484        })
1485        .unwrap();
1486        contains
1487    }
1488
1489    /// Get the output expressions and their corresponding columns.
1490    ///
1491    /// The parent node may reference the output columns of the plan by expressions, such as
1492    /// projection over aggregate or window functions. This method helps to convert the
1493    /// referenced expressions into columns.
1494    ///
1495    /// See also: [`crate::utils::columnize_expr`]
1496    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1497        match self {
1498            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1499                .output_expressions()?
1500                .into_iter()
1501                .zip(self.schema().columns())
1502                .collect()),
1503            LogicalPlan::Window(Window {
1504                window_expr,
1505                input,
1506                schema,
1507            }) => {
1508                // The input could be another Window, so the result should also include the input's. For Example:
1509                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1510                // Its plan is:
1511                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1512                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1513                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1514                //       TableScan: t projection=[a, b]
1515                let mut output_exprs = input.columnized_output_exprs()?;
1516                let input_len = input.schema().fields().len();
1517                output_exprs.extend(
1518                    window_expr
1519                        .iter()
1520                        .zip(schema.columns().into_iter().skip(input_len)),
1521                );
1522                Ok(output_exprs)
1523            }
1524            _ => Ok(vec![]),
1525        }
1526    }
1527}
1528
1529impl LogicalPlan {
1530    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1531    /// ...) replaced with corresponding values provided in
1532    /// `params_values`
1533    ///
1534    /// See [`Self::with_param_values`] for examples and usage with an owned
1535    /// `ParamValues`
1536    pub fn replace_params_with_values(
1537        self,
1538        param_values: &ParamValues,
1539    ) -> Result<LogicalPlan> {
1540        self.transform_up_with_subqueries(|plan| {
1541            let schema = Arc::clone(plan.schema());
1542            let name_preserver = NamePreserver::new(&plan);
1543            plan.map_expressions(|e| {
1544                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1545                if !has_placeholder {
1546                    // Performance optimization:
1547                    // avoid NamePreserver copy and second pass over expression
1548                    // if no placeholders.
1549                    Ok(Transformed::no(e))
1550                } else {
1551                    let original_name = name_preserver.save(&e);
1552                    let transformed_expr = e.transform_up(|e| {
1553                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1554                            let (value, metadata) = param_values
1555                                .get_placeholders_with_values(&id)?
1556                                .into_inner();
1557                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1558                        } else {
1559                            Ok(Transformed::no(e))
1560                        }
1561                    })?;
1562                    // Preserve name to avoid breaking column references to this expression
1563                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1564                }
1565            })?
1566            .map_data(|plan| plan.update_schema_data_type())
1567        })
1568        .map(|res| res.data)
1569    }
1570
1571    /// Recompute schema fields' data type after replacing params, ensuring fields data type can be
1572    /// updated according to the new parameters.
1573    ///
1574    /// Unlike `recompute_schema()`, this method rebuilds VALUES plans entirely to properly infer
1575    /// types types from literal values after placeholder substitution.
1576    fn update_schema_data_type(self) -> Result<LogicalPlan> {
1577        match self {
1578            // Build `LogicalPlan::Values` from the values for type inference.
1579            // We can't use `recompute_schema` because it skips recomputing for
1580            // `LogicalPlan::Values`.
1581            LogicalPlan::Values(Values { values, schema: _ }) => {
1582                LogicalPlanBuilder::values(values)?.build()
1583            }
1584            // other plans can just use `recompute_schema` directly.
1585            plan => plan.recompute_schema(),
1586        }
1587    }
1588
1589    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1590    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1591        let mut param_names = HashSet::new();
1592        self.apply_with_subqueries(|plan| {
1593            plan.apply_expressions(|expr| {
1594                expr.apply(|expr| {
1595                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1596                        param_names.insert(id.clone());
1597                    }
1598                    Ok(TreeNodeRecursion::Continue)
1599                })
1600            })
1601        })
1602        .map(|_| param_names)
1603    }
1604
1605    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1606    ///
1607    /// Note that this will drop any extension or field metadata attached to parameters. Use
1608    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1609    pub fn get_parameter_types(
1610        &self,
1611    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1612        let mut parameter_fields = self.get_parameter_fields()?;
1613        Ok(parameter_fields
1614            .drain()
1615            .map(|(name, maybe_field)| {
1616                (name, maybe_field.map(|field| field.data_type().clone()))
1617            })
1618            .collect())
1619    }
1620
1621    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1622    pub fn get_parameter_fields(
1623        &self,
1624    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1625        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1626
1627        self.apply_with_subqueries(|plan| {
1628            plan.apply_expressions(|expr| {
1629                expr.apply(|expr| {
1630                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1631                        let prev = param_types.get(id);
1632                        match (prev, field) {
1633                            (Some(Some(prev)), Some(field)) => {
1634                                check_metadata_with_storage_equal(
1635                                    (field.data_type(), Some(field.metadata())),
1636                                    (prev.data_type(), Some(prev.metadata())),
1637                                    "parameter",
1638                                    &format!(": Conflicting types for id {id}"),
1639                                )?;
1640                            }
1641                            (_, Some(field)) => {
1642                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1643                            }
1644                            _ => {
1645                                param_types.insert(id.clone(), None);
1646                            }
1647                        }
1648                    }
1649                    Ok(TreeNodeRecursion::Continue)
1650                })
1651            })
1652        })
1653        .map(|_| param_types)
1654    }
1655
1656    // ------------
1657    // Various implementations for printing out LogicalPlans
1658    // ------------
1659
1660    /// Return a `format`able structure that produces a single line
1661    /// per node.
1662    ///
1663    /// # Example
1664    ///
1665    /// ```text
1666    /// Projection: employee.id
1667    ///    Filter: employee.state Eq Utf8(\"CO\")\
1668    ///       CsvScan: employee projection=Some([0, 3])
1669    /// ```
1670    ///
1671    /// ```
1672    /// use arrow::datatypes::{DataType, Field, Schema};
1673    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1674    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1675    /// let plan = table_scan(Some("t1"), &schema, None)
1676    ///     .unwrap()
1677    ///     .filter(col("id").eq(lit(5)))
1678    ///     .unwrap()
1679    ///     .build()
1680    ///     .unwrap();
1681    ///
1682    /// // Format using display_indent
1683    /// let display_string = format!("{}", plan.display_indent());
1684    ///
1685    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1686    /// ```
1687    pub fn display_indent(&self) -> impl Display + '_ {
1688        // Boilerplate structure to wrap LogicalPlan with something
1689        // that that can be formatted
1690        struct Wrapper<'a>(&'a LogicalPlan);
1691        impl Display for Wrapper<'_> {
1692            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1693                let with_schema = false;
1694                let mut visitor = IndentVisitor::new(f, with_schema);
1695                match self.0.visit_with_subqueries(&mut visitor) {
1696                    Ok(_) => Ok(()),
1697                    Err(_) => Err(fmt::Error),
1698                }
1699            }
1700        }
1701        Wrapper(self)
1702    }
1703
1704    /// Return a `format`able structure that produces a single line
1705    /// per node that includes the output schema. For example:
1706    ///
1707    /// ```text
1708    /// Projection: employee.id [id:Int32]\
1709    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1710    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1711    /// ```
1712    ///
1713    /// ```
1714    /// use arrow::datatypes::{DataType, Field, Schema};
1715    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1716    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1717    /// let plan = table_scan(Some("t1"), &schema, None)
1718    ///     .unwrap()
1719    ///     .filter(col("id").eq(lit(5)))
1720    ///     .unwrap()
1721    ///     .build()
1722    ///     .unwrap();
1723    ///
1724    /// // Format using display_indent_schema
1725    /// let display_string = format!("{}", plan.display_indent_schema());
1726    ///
1727    /// assert_eq!(
1728    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1729    ///             \n  TableScan: t1 [id:Int32]",
1730    ///     display_string
1731    /// );
1732    /// ```
1733    pub fn display_indent_schema(&self) -> impl Display + '_ {
1734        // Boilerplate structure to wrap LogicalPlan with something
1735        // that that can be formatted
1736        struct Wrapper<'a>(&'a LogicalPlan);
1737        impl Display for Wrapper<'_> {
1738            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1739                let with_schema = true;
1740                let mut visitor = IndentVisitor::new(f, with_schema);
1741                match self.0.visit_with_subqueries(&mut visitor) {
1742                    Ok(_) => Ok(()),
1743                    Err(_) => Err(fmt::Error),
1744                }
1745            }
1746        }
1747        Wrapper(self)
1748    }
1749
1750    /// Return a displayable structure that produces plan in postgresql JSON format.
1751    ///
1752    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1753    pub fn display_pg_json(&self) -> impl Display + '_ {
1754        // Boilerplate structure to wrap LogicalPlan with something
1755        // that that can be formatted
1756        struct Wrapper<'a>(&'a LogicalPlan);
1757        impl Display for Wrapper<'_> {
1758            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1759                let mut visitor = PgJsonVisitor::new(f);
1760                visitor.with_schema(true);
1761                match self.0.visit_with_subqueries(&mut visitor) {
1762                    Ok(_) => Ok(()),
1763                    Err(_) => Err(fmt::Error),
1764                }
1765            }
1766        }
1767        Wrapper(self)
1768    }
1769
1770    /// Return a `format`able structure that produces lines meant for
1771    /// graphical display using the `DOT` language. This format can be
1772    /// visualized using software from
1773    /// [`graphviz`](https://graphviz.org/)
1774    ///
1775    /// This currently produces two graphs -- one with the basic
1776    /// structure, and one with additional details such as schema.
1777    ///
1778    /// ```
1779    /// use arrow::datatypes::{DataType, Field, Schema};
1780    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1781    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1782    /// let plan = table_scan(Some("t1"), &schema, None)
1783    ///     .unwrap()
1784    ///     .filter(col("id").eq(lit(5)))
1785    ///     .unwrap()
1786    ///     .build()
1787    ///     .unwrap();
1788    ///
1789    /// // Format using display_graphviz
1790    /// let graphviz_string = format!("{}", plan.display_graphviz());
1791    /// ```
1792    ///
1793    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1794    /// commands can be used to render it as a pdf:
1795    ///
1796    /// ```bash
1797    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1798    /// ```
1799    pub fn display_graphviz(&self) -> impl Display + '_ {
1800        // Boilerplate structure to wrap LogicalPlan with something
1801        // that that can be formatted
1802        struct Wrapper<'a>(&'a LogicalPlan);
1803        impl Display for Wrapper<'_> {
1804            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1805                let mut visitor = GraphvizVisitor::new(f);
1806
1807                visitor.start_graph()?;
1808
1809                visitor.pre_visit_plan("LogicalPlan")?;
1810                self.0
1811                    .visit_with_subqueries(&mut visitor)
1812                    .map_err(|_| fmt::Error)?;
1813                visitor.post_visit_plan()?;
1814
1815                visitor.set_with_schema(true);
1816                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1817                self.0
1818                    .visit_with_subqueries(&mut visitor)
1819                    .map_err(|_| fmt::Error)?;
1820                visitor.post_visit_plan()?;
1821
1822                visitor.end_graph()?;
1823                Ok(())
1824            }
1825        }
1826        Wrapper(self)
1827    }
1828
1829    /// Return a `format`able structure with the a human readable
1830    /// description of this LogicalPlan node per node, not including
1831    /// children. For example:
1832    ///
1833    /// ```text
1834    /// Projection: id
1835    /// ```
1836    /// ```
1837    /// use arrow::datatypes::{DataType, Field, Schema};
1838    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1839    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1840    /// let plan = table_scan(Some("t1"), &schema, None)
1841    ///     .unwrap()
1842    ///     .build()
1843    ///     .unwrap();
1844    ///
1845    /// // Format using display
1846    /// let display_string = format!("{}", plan.display());
1847    ///
1848    /// assert_eq!("TableScan: t1", display_string);
1849    /// ```
1850    pub fn display(&self) -> impl Display + '_ {
1851        // Boilerplate structure to wrap LogicalPlan with something
1852        // that that can be formatted
1853        struct Wrapper<'a>(&'a LogicalPlan);
1854        impl Display for Wrapper<'_> {
1855            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1856                match self.0 {
1857                    LogicalPlan::EmptyRelation(EmptyRelation {
1858                        produce_one_row,
1859                        schema: _,
1860                    }) => {
1861                        let rows = if *produce_one_row { 1 } else { 0 };
1862                        write!(f, "EmptyRelation: rows={rows}")
1863                    }
1864                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1865                        is_distinct, ..
1866                    }) => {
1867                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1868                    }
1869                    LogicalPlan::Values(Values { values, .. }) => {
1870                        let str_values: Vec<_> = values
1871                            .iter()
1872                            // limit to only 5 values to avoid horrible display
1873                            .take(5)
1874                            .map(|row| {
1875                                let item = row
1876                                    .iter()
1877                                    .map(|expr| expr.to_string())
1878                                    .collect::<Vec<_>>()
1879                                    .join(", ");
1880                                format!("({item})")
1881                            })
1882                            .collect();
1883
1884                        let eclipse = if values.len() > 5 { "..." } else { "" };
1885                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1886                    }
1887
1888                    LogicalPlan::TableScan(TableScan {
1889                        source,
1890                        table_name,
1891                        projection,
1892                        filters,
1893                        fetch,
1894                        ..
1895                    }) => {
1896                        let projected_fields = match projection {
1897                            Some(indices) => {
1898                                let schema = source.schema();
1899                                let names: Vec<&str> = indices
1900                                    .iter()
1901                                    .map(|i| schema.field(*i).name().as_str())
1902                                    .collect();
1903                                format!(" projection=[{}]", names.join(", "))
1904                            }
1905                            _ => "".to_string(),
1906                        };
1907
1908                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1909
1910                        if !filters.is_empty() {
1911                            let mut full_filter = vec![];
1912                            let mut partial_filter = vec![];
1913                            let mut unsupported_filters = vec![];
1914                            let filters: Vec<&Expr> = filters.iter().collect();
1915
1916                            if let Ok(results) =
1917                                source.supports_filters_pushdown(&filters)
1918                            {
1919                                filters.iter().zip(results.iter()).for_each(
1920                                    |(x, res)| match res {
1921                                        TableProviderFilterPushDown::Exact => {
1922                                            full_filter.push(x)
1923                                        }
1924                                        TableProviderFilterPushDown::Inexact => {
1925                                            partial_filter.push(x)
1926                                        }
1927                                        TableProviderFilterPushDown::Unsupported => {
1928                                            unsupported_filters.push(x)
1929                                        }
1930                                    },
1931                                );
1932                            }
1933
1934                            if !full_filter.is_empty() {
1935                                write!(
1936                                    f,
1937                                    ", full_filters=[{}]",
1938                                    expr_vec_fmt!(full_filter)
1939                                )?;
1940                            };
1941                            if !partial_filter.is_empty() {
1942                                write!(
1943                                    f,
1944                                    ", partial_filters=[{}]",
1945                                    expr_vec_fmt!(partial_filter)
1946                                )?;
1947                            }
1948                            if !unsupported_filters.is_empty() {
1949                                write!(
1950                                    f,
1951                                    ", unsupported_filters=[{}]",
1952                                    expr_vec_fmt!(unsupported_filters)
1953                                )?;
1954                            }
1955                        }
1956
1957                        if let Some(n) = fetch {
1958                            write!(f, ", fetch={n}")?;
1959                        }
1960
1961                        Ok(())
1962                    }
1963                    LogicalPlan::Projection(Projection { expr, .. }) => {
1964                        write!(f, "Projection:")?;
1965                        for (i, expr_item) in expr.iter().enumerate() {
1966                            if i > 0 {
1967                                write!(f, ",")?;
1968                            }
1969                            write!(f, " {expr_item}")?;
1970                        }
1971                        Ok(())
1972                    }
1973                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1974                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1975                    }
1976                    LogicalPlan::Copy(CopyTo {
1977                        input: _,
1978                        output_url,
1979                        file_type,
1980                        options,
1981                        ..
1982                    }) => {
1983                        let op_str = options
1984                            .iter()
1985                            .map(|(k, v)| format!("{k} {v}"))
1986                            .collect::<Vec<String>>()
1987                            .join(", ");
1988
1989                        write!(
1990                            f,
1991                            "CopyTo: format={} output_url={output_url} options: ({op_str})",
1992                            file_type.get_ext()
1993                        )
1994                    }
1995                    LogicalPlan::Ddl(ddl) => {
1996                        write!(f, "{}", ddl.display())
1997                    }
1998                    LogicalPlan::Filter(Filter {
1999                        predicate: expr, ..
2000                    }) => write!(f, "Filter: {expr}"),
2001                    LogicalPlan::Window(Window { window_expr, .. }) => {
2002                        write!(
2003                            f,
2004                            "WindowAggr: windowExpr=[[{}]]",
2005                            expr_vec_fmt!(window_expr)
2006                        )
2007                    }
2008                    LogicalPlan::Aggregate(Aggregate {
2009                        group_expr,
2010                        aggr_expr,
2011                        ..
2012                    }) => write!(
2013                        f,
2014                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2015                        expr_vec_fmt!(group_expr),
2016                        expr_vec_fmt!(aggr_expr)
2017                    ),
2018                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2019                        write!(f, "Sort: ")?;
2020                        for (i, expr_item) in expr.iter().enumerate() {
2021                            if i > 0 {
2022                                write!(f, ", ")?;
2023                            }
2024                            write!(f, "{expr_item}")?;
2025                        }
2026                        if let Some(a) = fetch {
2027                            write!(f, ", fetch={a}")?;
2028                        }
2029
2030                        Ok(())
2031                    }
2032                    LogicalPlan::Join(Join {
2033                        on: keys,
2034                        filter,
2035                        join_constraint,
2036                        join_type,
2037                        ..
2038                    }) => {
2039                        let join_expr: Vec<String> =
2040                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2041                        let filter_expr = filter
2042                            .as_ref()
2043                            .map(|expr| format!(" Filter: {expr}"))
2044                            .unwrap_or_else(|| "".to_string());
2045                        let join_type = if filter.is_none()
2046                            && keys.is_empty()
2047                            && *join_type == JoinType::Inner
2048                        {
2049                            "Cross".to_string()
2050                        } else {
2051                            join_type.to_string()
2052                        };
2053                        match join_constraint {
2054                            JoinConstraint::On => {
2055                                write!(f, "{join_type} Join:",)?;
2056                                if !join_expr.is_empty() || !filter_expr.is_empty() {
2057                                    write!(
2058                                        f,
2059                                        " {}{}",
2060                                        join_expr.join(", "),
2061                                        filter_expr
2062                                    )?;
2063                                }
2064                                Ok(())
2065                            }
2066                            JoinConstraint::Using => {
2067                                write!(
2068                                    f,
2069                                    "{} Join: Using {}{}",
2070                                    join_type,
2071                                    join_expr.join(", "),
2072                                    filter_expr,
2073                                )
2074                            }
2075                        }
2076                    }
2077                    LogicalPlan::Repartition(Repartition {
2078                        partitioning_scheme,
2079                        ..
2080                    }) => match partitioning_scheme {
2081                        Partitioning::RoundRobinBatch(n) => {
2082                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2083                        }
2084                        Partitioning::Hash(expr, n) => {
2085                            let hash_expr: Vec<String> =
2086                                expr.iter().map(|e| format!("{e}")).collect();
2087                            write!(
2088                                f,
2089                                "Repartition: Hash({}) partition_count={}",
2090                                hash_expr.join(", "),
2091                                n
2092                            )
2093                        }
2094                        Partitioning::DistributeBy(expr) => {
2095                            let dist_by_expr: Vec<String> =
2096                                expr.iter().map(|e| format!("{e}")).collect();
2097                            write!(
2098                                f,
2099                                "Repartition: DistributeBy({})",
2100                                dist_by_expr.join(", "),
2101                            )
2102                        }
2103                    },
2104                    LogicalPlan::Limit(limit) => {
2105                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
2106                        let skip_str = match limit.get_skip_type() {
2107                            Ok(SkipType::Literal(n)) => n.to_string(),
2108                            _ => limit
2109                                .skip
2110                                .as_ref()
2111                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2112                        };
2113                        let fetch_str = match limit.get_fetch_type() {
2114                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
2115                            Ok(FetchType::Literal(None)) => "None".to_string(),
2116                            _ => limit
2117                                .fetch
2118                                .as_ref()
2119                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2120                        };
2121                        write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2122                    }
2123                    LogicalPlan::Subquery(Subquery { .. }) => {
2124                        write!(f, "Subquery:")
2125                    }
2126                    LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2127                        write!(f, "SubqueryAlias: {alias}")
2128                    }
2129                    LogicalPlan::Statement(statement) => {
2130                        write!(f, "{}", statement.display())
2131                    }
2132                    LogicalPlan::Distinct(distinct) => match distinct {
2133                        Distinct::All(_) => write!(f, "Distinct:"),
2134                        Distinct::On(DistinctOn {
2135                            on_expr,
2136                            select_expr,
2137                            sort_expr,
2138                            ..
2139                        }) => write!(
2140                            f,
2141                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2142                            expr_vec_fmt!(on_expr),
2143                            expr_vec_fmt!(select_expr),
2144                            if let Some(sort_expr) = sort_expr {
2145                                expr_vec_fmt!(sort_expr)
2146                            } else {
2147                                "".to_string()
2148                            },
2149                        ),
2150                    },
2151                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2152                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2153                    LogicalPlan::Union(_) => write!(f, "Union"),
2154                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2155                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2156                        write!(f, "DescribeTable")
2157                    }
2158                    LogicalPlan::Unnest(Unnest {
2159                        input: plan,
2160                        list_type_columns: list_col_indices,
2161                        struct_type_columns: struct_col_indices,
2162                        ..
2163                    }) => {
2164                        let input_columns = plan.schema().columns();
2165                        let list_type_columns = list_col_indices
2166                            .iter()
2167                            .map(|(i, unnest_info)| {
2168                                format!(
2169                                    "{}|depth={}",
2170                                    &input_columns[*i].to_string(),
2171                                    unnest_info.depth
2172                                )
2173                            })
2174                            .collect::<Vec<String>>();
2175                        let struct_type_columns = struct_col_indices
2176                            .iter()
2177                            .map(|i| &input_columns[*i])
2178                            .collect::<Vec<&Column>>();
2179                        // get items from input_columns indexed by list_col_indices
2180                        write!(
2181                            f,
2182                            "Unnest: lists[{}] structs[{}]",
2183                            expr_vec_fmt!(list_type_columns),
2184                            expr_vec_fmt!(struct_type_columns)
2185                        )
2186                    }
2187                }
2188            }
2189        }
2190        Wrapper(self)
2191    }
2192
2193    /// Return a `LogicalPLan` with all [`LambdaVariable`]'s resolved
2194    ///
2195    /// [`LambdaVariable`]: crate::expr::LambdaVariable
2196    pub fn resolve_lambda_variables(self) -> Result<Transformed<LogicalPlan>> {
2197        self.transform_with_subqueries(|plan| {
2198            let schema = merge_schema(&plan.inputs());
2199
2200            plan.map_expressions(|expr| expr.resolve_lambda_variables(&schema))
2201        })
2202    }
2203}
2204
2205impl Display for LogicalPlan {
2206    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2207        self.display_indent().fmt(f)
2208    }
2209}
2210
2211impl ToStringifiedPlan for LogicalPlan {
2212    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2213        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2214    }
2215}
2216
2217/// Relationship produces 0 or 1 placeholder rows with specified output schema
2218/// In most cases the output schema for `EmptyRelation` would be empty,
2219/// however, it can be non-empty typically for optimizer rules
2220#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2221pub struct EmptyRelation {
2222    /// Whether to produce a placeholder row
2223    pub produce_one_row: bool,
2224    /// The schema description of the output
2225    pub schema: DFSchemaRef,
2226}
2227
2228// Manual implementation needed because of `schema` field. Comparison excludes this field.
2229impl PartialOrd for EmptyRelation {
2230    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2231        self.produce_one_row
2232            .partial_cmp(&other.produce_one_row)
2233            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2234            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2235    }
2236}
2237
2238/// A variadic query operation, Recursive CTE.
2239///
2240/// # Recursive Query Evaluation
2241///
2242/// From the [Postgres Docs]:
2243///
2244/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2245///    discard duplicate rows. Include all remaining rows in the result of the
2246///    recursive query, and also place them in a temporary working table.
2247///
2248/// 2. So long as the working table is not empty, repeat these steps:
2249///
2250/// * Evaluate the recursive term, substituting the current contents of the
2251///   working table for the recursive self-reference. For `UNION` (but not `UNION
2252///   ALL`), discard duplicate rows and rows that duplicate any previous result
2253///   row. Include all remaining rows in the result of the recursive query, and
2254///   also place them in a temporary intermediate table.
2255///
2256/// * Replace the contents of the working table with the contents of the
2257///   intermediate table, then empty the intermediate table.
2258///
2259/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2260#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2261pub struct RecursiveQuery {
2262    /// Name of the query
2263    pub name: String,
2264    /// The static term (initial contents of the working table)
2265    pub static_term: Arc<LogicalPlan>,
2266    /// The recursive term (evaluated on the contents of the working table until
2267    /// it returns an empty set)
2268    pub recursive_term: Arc<LogicalPlan>,
2269    /// Should the output of the recursive term be deduplicated (`UNION`) or
2270    /// not (`UNION ALL`).
2271    pub is_distinct: bool,
2272}
2273
2274/// Values expression. See
2275/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2276/// documentation for more details.
2277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2278pub struct Values {
2279    /// The table schema
2280    pub schema: DFSchemaRef,
2281    /// Values
2282    pub values: Vec<Vec<Expr>>,
2283}
2284
2285// Manual implementation needed because of `schema` field. Comparison excludes this field.
2286impl PartialOrd for Values {
2287    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2288        self.values
2289            .partial_cmp(&other.values)
2290            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2291            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2292    }
2293}
2294
2295/// Evaluates an arbitrary list of expressions (essentially a
2296/// SELECT with an expression list) on its input.
2297#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2298// mark non_exhaustive to encourage use of try_new/new()
2299#[non_exhaustive]
2300pub struct Projection {
2301    /// The list of expressions
2302    pub expr: Vec<Expr>,
2303    /// The incoming logical plan
2304    pub input: Arc<LogicalPlan>,
2305    /// The schema description of the output
2306    pub schema: DFSchemaRef,
2307}
2308
2309// Manual implementation needed because of `schema` field. Comparison excludes this field.
2310impl PartialOrd for Projection {
2311    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2312        match self.expr.partial_cmp(&other.expr) {
2313            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2314            cmp => cmp,
2315        }
2316        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2317        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2318    }
2319}
2320
2321impl Projection {
2322    /// Create a new Projection
2323    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2324        let projection_schema = projection_schema(&input, &expr)?;
2325        Self::try_new_with_schema(expr, input, projection_schema)
2326    }
2327
2328    /// Create a new Projection using the specified output schema
2329    pub fn try_new_with_schema(
2330        expr: Vec<Expr>,
2331        input: Arc<LogicalPlan>,
2332        schema: DFSchemaRef,
2333    ) -> Result<Self> {
2334        #[expect(deprecated)]
2335        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2336            && expr.len() != schema.fields().len()
2337        {
2338            return plan_err!(
2339                "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2340                expr.len(),
2341                schema.fields().len()
2342            );
2343        }
2344        Ok(Self {
2345            expr,
2346            input,
2347            schema,
2348        })
2349    }
2350
2351    /// Create a new Projection using the specified output schema
2352    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2353        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2354        Self {
2355            expr,
2356            input,
2357            schema,
2358        }
2359    }
2360}
2361
2362/// Computes the schema of the result produced by applying a projection to the input logical plan.
2363///
2364/// # Arguments
2365///
2366/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2367///   will be computed.
2368/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2369///
2370/// # Metadata Handling
2371///
2372/// - **Schema-level metadata**: Passed through unchanged from the input schema
2373/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2374///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2375///
2376/// # Returns
2377///
2378/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2379/// produced by the projection operation. If the schema computation is successful,
2380/// the `Result` will contain the schema; otherwise, it will contain an error.
2381pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2382    // Preserve input schema metadata at the schema level
2383    let metadata = input.schema().metadata().clone();
2384
2385    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2386    let schema =
2387        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2388            .with_functional_dependencies(calc_func_dependencies_for_project(
2389                exprs, input,
2390            )?)?;
2391
2392    Ok(Arc::new(schema))
2393}
2394
2395/// Aliased subquery
2396#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2397// mark non_exhaustive to encourage use of try_new/new()
2398#[non_exhaustive]
2399pub struct SubqueryAlias {
2400    /// The incoming logical plan
2401    pub input: Arc<LogicalPlan>,
2402    /// The alias for the input relation
2403    pub alias: TableReference,
2404    /// The schema with qualified field names
2405    pub schema: DFSchemaRef,
2406}
2407
2408impl SubqueryAlias {
2409    pub fn try_new(
2410        plan: Arc<LogicalPlan>,
2411        alias: impl Into<TableReference>,
2412    ) -> Result<Self> {
2413        let alias = alias.into();
2414
2415        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2416        // no field must share the same column name as this would lead to ambiguity when referencing
2417        // columns in parent logical nodes.
2418
2419        // Compute unique aliases, if any, for each column of the input's schema.
2420        let aliases = unique_field_aliases(plan.schema().fields());
2421        let is_projection_needed = aliases.iter().any(Option::is_some);
2422
2423        // Insert a projection node, if needed, to make sure aliases are applied.
2424        let plan = if is_projection_needed {
2425            let projection_expressions = aliases
2426                .iter()
2427                .zip(plan.schema().iter())
2428                .map(|(alias, (qualifier, field))| {
2429                    let column =
2430                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2431                    match alias {
2432                        None => column,
2433                        Some(alias) => {
2434                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2435                        }
2436                    }
2437                })
2438                .collect();
2439            let projection = Projection::try_new(projection_expressions, plan)?;
2440            Arc::new(LogicalPlan::Projection(projection))
2441        } else {
2442            plan
2443        };
2444
2445        // Requalify fields with the new `alias`.
2446        let fields = plan.schema().fields().clone();
2447        let meta_data = plan.schema().metadata().clone();
2448        let func_dependencies = plan.schema().functional_dependencies().clone();
2449
2450        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2451        let schema = schema.as_arrow();
2452
2453        let schema = DFSchemaRef::new(
2454            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2455                .with_functional_dependencies(func_dependencies)?,
2456        );
2457        Ok(SubqueryAlias {
2458            input: plan,
2459            alias,
2460            schema,
2461        })
2462    }
2463}
2464
2465// Manual implementation needed because of `schema` field. Comparison excludes this field.
2466impl PartialOrd for SubqueryAlias {
2467    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2468        match self.input.partial_cmp(&other.input) {
2469            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2470            cmp => cmp,
2471        }
2472        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2473        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2474    }
2475}
2476
2477/// Filters rows from its input that do not match an
2478/// expression (essentially a WHERE clause with a predicate
2479/// expression).
2480///
2481/// Semantically, `<predicate>` is evaluated for each row of the input;
2482/// If the value of `<predicate>` is true, the input row is passed to
2483/// the output. If the value of `<predicate>` is false, the row is
2484/// discarded.
2485///
2486/// Filter should not be created directly but instead use `try_new()`
2487/// and that these fields are only pub to support pattern matching
2488#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2489#[non_exhaustive]
2490pub struct Filter {
2491    /// The predicate expression, which must have Boolean type.
2492    pub predicate: Expr,
2493    /// The incoming logical plan
2494    pub input: Arc<LogicalPlan>,
2495}
2496
2497impl Filter {
2498    /// Create a new filter operator.
2499    ///
2500    /// Notes: as Aliases have no effect on the output of a filter operator,
2501    /// they are removed from the predicate expression.
2502    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2503        Self::try_new_internal(predicate, input)
2504    }
2505
2506    /// Create a new filter operator for a having clause.
2507    /// This is similar to a filter, but its having flag is set to true.
2508    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2509    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2510        Self::try_new_internal(predicate, input)
2511    }
2512
2513    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2514        match data_type {
2515            // Interpret NULL as a missing boolean value.
2516            DataType::Boolean | DataType::Null => true,
2517            DataType::Dictionary(_, value_type) => {
2518                Filter::is_allowed_filter_type(value_type.as_ref())
2519            }
2520            _ => false,
2521        }
2522    }
2523
2524    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2525        // Filter predicates must return a boolean value so we try and validate that here.
2526        // Note that it is not always possible to resolve the predicate expression during plan
2527        // construction (such as with correlated subqueries) so we make a best effort here and
2528        // ignore errors resolving the expression against the schema.
2529        if let Ok(predicate_type) = predicate.get_type(input.schema())
2530            && !Filter::is_allowed_filter_type(&predicate_type)
2531        {
2532            return plan_err!(
2533                "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2534            );
2535        }
2536
2537        Ok(Self {
2538            predicate: predicate.unalias_nested().data,
2539            input,
2540        })
2541    }
2542
2543    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2544    ///
2545    /// This function will return `true` if its predicate contains a conjunction of
2546    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2547    /// by this conjunction.
2548    ///
2549    /// For example, for the table:
2550    /// ```sql
2551    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2552    /// ```
2553    /// `Filter(a = 2).is_scalar() == true`
2554    /// , whereas
2555    /// `Filter(b = 2).is_scalar() == false`
2556    /// and
2557    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2558    fn is_scalar(&self) -> bool {
2559        let schema = self.input.schema();
2560
2561        let functional_dependencies = self.input.schema().functional_dependencies();
2562        let unique_keys = functional_dependencies.iter().filter(|dep| {
2563            let nullable = dep.nullable
2564                && dep
2565                    .source_indices
2566                    .iter()
2567                    .any(|&source| schema.field(source).is_nullable());
2568            !nullable
2569                && dep.mode == Dependency::Single
2570                && dep.target_indices.len() == schema.fields().len()
2571        });
2572
2573        let exprs = split_conjunction(&self.predicate);
2574        let eq_pred_cols: HashSet<_> = exprs
2575            .iter()
2576            .filter_map(|expr| {
2577                let Expr::BinaryExpr(BinaryExpr {
2578                    left,
2579                    op: Operator::Eq,
2580                    right,
2581                }) = expr
2582                else {
2583                    return None;
2584                };
2585                // This is a no-op filter expression
2586                if left == right {
2587                    return None;
2588                }
2589
2590                match (left.as_ref(), right.as_ref()) {
2591                    (Expr::Column(_), Expr::Column(_)) => None,
2592                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2593                        Some(schema.index_of_column(c).unwrap())
2594                    }
2595                    _ => None,
2596                }
2597            })
2598            .collect();
2599
2600        // If we have a functional dependence that is a subset of our predicate,
2601        // this filter is scalar
2602        for key in unique_keys {
2603            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2604                return true;
2605            }
2606        }
2607        false
2608    }
2609}
2610
2611/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2612///
2613/// # Output Schema
2614///
2615/// The output schema is the input schema followed by the window function
2616/// expressions, in order.
2617///
2618/// For example, given the input schema `"A", "B", "C"` and the window function
2619/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2620/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2621/// output column.
2622///
2623/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2624/// schema.
2625#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2626pub struct Window {
2627    /// The incoming logical plan
2628    pub input: Arc<LogicalPlan>,
2629    /// The window function expression
2630    pub window_expr: Vec<Expr>,
2631    /// The schema description of the window output
2632    pub schema: DFSchemaRef,
2633}
2634
2635impl Window {
2636    /// Create a new window operator.
2637    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2638        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2639            .schema()
2640            .iter()
2641            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2642            .collect();
2643        let input_len = fields.len();
2644        let mut window_fields = fields;
2645        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2646        window_fields.extend_from_slice(expr_fields.as_slice());
2647        let metadata = input.schema().metadata().clone();
2648
2649        // Update functional dependencies for window:
2650        let mut window_func_dependencies =
2651            input.schema().functional_dependencies().clone();
2652        window_func_dependencies.extend_target_indices(window_fields.len());
2653
2654        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2655        // of consecutive numbers per partition), we can represent this fact with
2656        // functional dependencies.
2657        let mut new_dependencies = window_expr
2658            .iter()
2659            .enumerate()
2660            .filter_map(|(idx, expr)| {
2661                let Expr::WindowFunction(window_fun) = expr else {
2662                    return None;
2663                };
2664                let WindowFunction {
2665                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2666                    params: WindowFunctionParams { partition_by, .. },
2667                } = window_fun.as_ref()
2668                else {
2669                    return None;
2670                };
2671                // When there is no PARTITION BY, row number will be unique
2672                // across the entire table.
2673                if udwf.name() == "row_number" && partition_by.is_empty() {
2674                    Some(idx + input_len)
2675                } else {
2676                    None
2677                }
2678            })
2679            .map(|idx| {
2680                FunctionalDependence::new(vec![idx], vec![], false)
2681                    .with_mode(Dependency::Single)
2682            })
2683            .collect::<Vec<_>>();
2684
2685        if !new_dependencies.is_empty() {
2686            for dependence in new_dependencies.iter_mut() {
2687                dependence.target_indices = (0..window_fields.len()).collect();
2688            }
2689            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2690            let new_deps = FunctionalDependencies::new(new_dependencies);
2691            window_func_dependencies.extend(new_deps);
2692        }
2693
2694        // Validate that FILTER clauses are only used with aggregate window functions
2695        if let Some(e) = window_expr.iter().find(|e| {
2696            matches!(
2697                e,
2698                Expr::WindowFunction(wf)
2699                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2700                        && wf.params.filter.is_some()
2701            )
2702        }) {
2703            return plan_err!(
2704                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2705            );
2706        }
2707
2708        Self::try_new_with_schema(
2709            window_expr,
2710            input,
2711            Arc::new(
2712                DFSchema::new_with_metadata(window_fields, metadata)?
2713                    .with_functional_dependencies(window_func_dependencies)?,
2714            ),
2715        )
2716    }
2717
2718    /// Create a new window function using the provided schema to avoid the overhead of
2719    /// building the schema again when the schema is already known.
2720    ///
2721    /// This method should only be called when you are absolutely sure that the schema being
2722    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2723    pub fn try_new_with_schema(
2724        window_expr: Vec<Expr>,
2725        input: Arc<LogicalPlan>,
2726        schema: DFSchemaRef,
2727    ) -> Result<Self> {
2728        let input_fields_count = input.schema().fields().len();
2729        if schema.fields().len() != input_fields_count + window_expr.len() {
2730            return plan_err!(
2731                "Window schema has wrong number of fields. Expected {} got {}",
2732                input_fields_count + window_expr.len(),
2733                schema.fields().len()
2734            );
2735        }
2736
2737        Ok(Window {
2738            input,
2739            window_expr,
2740            schema,
2741        })
2742    }
2743}
2744
2745// Manual implementation needed because of `schema` field. Comparison excludes this field.
2746impl PartialOrd for Window {
2747    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2748        match self.input.partial_cmp(&other.input)? {
2749            Ordering::Equal => {} // continue
2750            not_equal => return Some(not_equal),
2751        }
2752
2753        match self.window_expr.partial_cmp(&other.window_expr)? {
2754            Ordering::Equal => {} // continue
2755            not_equal => return Some(not_equal),
2756        }
2757
2758        // Contract for PartialOrd and PartialEq consistency requires that
2759        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2760        if self == other {
2761            Some(Ordering::Equal)
2762        } else {
2763            None
2764        }
2765    }
2766}
2767
2768/// Produces rows from a table provider by reference or from the context
2769#[derive(Clone)]
2770pub struct TableScan {
2771    /// The name of the table
2772    pub table_name: TableReference,
2773    /// The source of the table
2774    pub source: Arc<dyn TableSource>,
2775    /// Optional column indices to use as a projection
2776    pub projection: Option<Vec<usize>>,
2777    /// The schema description of the output
2778    pub projected_schema: DFSchemaRef,
2779    /// Optional expressions to be used as filters by the table provider
2780    pub filters: Vec<Expr>,
2781    /// Optional number of rows to read
2782    pub fetch: Option<usize>,
2783}
2784
2785impl Debug for TableScan {
2786    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2787        f.debug_struct("TableScan")
2788            .field("table_name", &self.table_name)
2789            .field("source", &"...")
2790            .field("projection", &self.projection)
2791            .field("projected_schema", &self.projected_schema)
2792            .field("filters", &self.filters)
2793            .field("fetch", &self.fetch)
2794            .finish_non_exhaustive()
2795    }
2796}
2797
2798impl PartialEq for TableScan {
2799    fn eq(&self, other: &Self) -> bool {
2800        self.table_name == other.table_name
2801            && self.projection == other.projection
2802            && self.projected_schema == other.projected_schema
2803            && self.filters == other.filters
2804            && self.fetch == other.fetch
2805    }
2806}
2807
2808impl Eq for TableScan {}
2809
2810// Manual implementation needed because of `source` and `projected_schema` fields.
2811// Comparison excludes these field.
2812impl PartialOrd for TableScan {
2813    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2814        #[derive(PartialEq, PartialOrd)]
2815        struct ComparableTableScan<'a> {
2816            /// The name of the table
2817            pub table_name: &'a TableReference,
2818            /// Optional column indices to use as a projection
2819            pub projection: &'a Option<Vec<usize>>,
2820            /// Optional expressions to be used as filters by the table provider
2821            pub filters: &'a Vec<Expr>,
2822            /// Optional number of rows to read
2823            pub fetch: &'a Option<usize>,
2824        }
2825        let comparable_self = ComparableTableScan {
2826            table_name: &self.table_name,
2827            projection: &self.projection,
2828            filters: &self.filters,
2829            fetch: &self.fetch,
2830        };
2831        let comparable_other = ComparableTableScan {
2832            table_name: &other.table_name,
2833            projection: &other.projection,
2834            filters: &other.filters,
2835            fetch: &other.fetch,
2836        };
2837        comparable_self
2838            .partial_cmp(&comparable_other)
2839            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2840            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2841    }
2842}
2843
2844impl Hash for TableScan {
2845    fn hash<H: Hasher>(&self, state: &mut H) {
2846        self.table_name.hash(state);
2847        self.projection.hash(state);
2848        self.projected_schema.hash(state);
2849        self.filters.hash(state);
2850        self.fetch.hash(state);
2851    }
2852}
2853
2854impl TableScan {
2855    /// Initialize TableScan with appropriate schema from the given
2856    /// arguments.
2857    pub fn try_new(
2858        table_name: impl Into<TableReference>,
2859        table_source: Arc<dyn TableSource>,
2860        projection: Option<Vec<usize>>,
2861        filters: Vec<Expr>,
2862        fetch: Option<usize>,
2863    ) -> Result<Self> {
2864        let table_name = table_name.into();
2865
2866        if table_name.table().is_empty() {
2867            return plan_err!("table_name cannot be empty");
2868        }
2869        let schema = table_source.schema();
2870        let func_dependencies = FunctionalDependencies::new_from_constraints(
2871            table_source.constraints(),
2872            schema.fields.len(),
2873        );
2874        let projected_schema = projection
2875            .as_ref()
2876            .map(|p| {
2877                let projected_func_dependencies =
2878                    func_dependencies.project_functional_dependencies(p, p.len());
2879
2880                let df_schema = DFSchema::new_with_metadata(
2881                    p.iter()
2882                        .map(|i| {
2883                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2884                        })
2885                        .collect(),
2886                    schema.metadata.clone(),
2887                )?;
2888                df_schema.with_functional_dependencies(projected_func_dependencies)
2889            })
2890            .unwrap_or_else(|| {
2891                let df_schema =
2892                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2893                df_schema.with_functional_dependencies(func_dependencies)
2894            })?;
2895        let projected_schema = Arc::new(projected_schema);
2896
2897        Ok(Self {
2898            table_name,
2899            source: table_source,
2900            projection,
2901            projected_schema,
2902            filters,
2903            fetch,
2904        })
2905    }
2906}
2907
2908// Repartition the plan based on a partitioning scheme.
2909#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2910pub struct Repartition {
2911    /// The incoming logical plan
2912    pub input: Arc<LogicalPlan>,
2913    /// The partitioning scheme
2914    pub partitioning_scheme: Partitioning,
2915}
2916
2917/// Union multiple inputs
2918#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2919pub struct Union {
2920    /// Inputs to merge
2921    pub inputs: Vec<Arc<LogicalPlan>>,
2922    /// Union schema. Should be the same for all inputs.
2923    pub schema: DFSchemaRef,
2924}
2925
2926impl Union {
2927    /// Constructs new Union instance deriving schema from inputs.
2928    /// Schema data types must match exactly.
2929    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2930        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2931        Ok(Union { inputs, schema })
2932    }
2933
2934    /// Constructs new Union instance deriving schema from inputs.
2935    /// Inputs do not have to have matching types and produced schema will
2936    /// take type from the first input.
2937    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2938    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2939        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2940        Ok(Union { inputs, schema })
2941    }
2942
2943    /// Constructs a new Union instance that combines rows from different tables by name,
2944    /// instead of by position. This means that the specified inputs need not have schemas
2945    /// that are all the same width.
2946    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2947        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2948        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2949
2950        Ok(Union { inputs, schema })
2951    }
2952
2953    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2954    /// in an additional `Projection` to account for absence of columns
2955    /// in input schemas or differing projection orders.
2956    fn rewrite_inputs_from_schema(
2957        schema: &Arc<DFSchema>,
2958        inputs: Vec<Arc<LogicalPlan>>,
2959    ) -> Result<Vec<Arc<LogicalPlan>>> {
2960        let schema_width = schema.iter().count();
2961        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2962        for input in inputs {
2963            // Any columns that exist within the derived schema but do not exist
2964            // within an input's schema should be replaced with `NULL` aliased
2965            // to the appropriate column in the derived schema.
2966            let mut expr = Vec::with_capacity(schema_width);
2967            for column in schema.columns() {
2968                if input
2969                    .schema()
2970                    .has_column_with_unqualified_name(column.name())
2971                {
2972                    expr.push(Expr::Column(column));
2973                } else {
2974                    expr.push(
2975                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2976                    );
2977                }
2978            }
2979            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2980                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2981            )));
2982        }
2983
2984        Ok(wrapped_inputs)
2985    }
2986
2987    /// Constructs new Union instance deriving schema from inputs.
2988    ///
2989    /// If `loose_types` is true, inputs do not need to have matching types and
2990    /// the produced schema will use the type from the first input.
2991    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2992    ///
2993    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2994    /// the constructed schema follows `UNION BY NAME` semantics.
2995    fn derive_schema_from_inputs(
2996        inputs: &[Arc<LogicalPlan>],
2997        loose_types: bool,
2998        by_name: bool,
2999    ) -> Result<DFSchemaRef> {
3000        if inputs.len() < 2 {
3001            return plan_err!("UNION requires at least two inputs");
3002        }
3003
3004        if by_name {
3005            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
3006        } else {
3007            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
3008        }
3009    }
3010
3011    fn derive_schema_from_inputs_by_name(
3012        inputs: &[Arc<LogicalPlan>],
3013        loose_types: bool,
3014    ) -> Result<DFSchemaRef> {
3015        type FieldData<'a> =
3016            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
3017        let mut cols: Vec<(&str, FieldData)> = Vec::new();
3018        for input in inputs.iter() {
3019            for field in input.schema().fields() {
3020                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3021                    cols.iter_mut().find(|(name, _)| name == field.name())
3022                {
3023                    if !loose_types && *data_type != field.data_type() {
3024                        return plan_err!(
3025                            "Found different types for field {}",
3026                            field.name()
3027                        );
3028                    }
3029
3030                    metadata.push(field.metadata());
3031                    // If the field is nullable in any one of the inputs,
3032                    // then the field in the final schema is also nullable.
3033                    *is_nullable |= field.is_nullable();
3034                    *occurrences += 1;
3035                } else {
3036                    cols.push((
3037                        field.name(),
3038                        (
3039                            field.data_type(),
3040                            field.is_nullable(),
3041                            vec![field.metadata()],
3042                            1,
3043                        ),
3044                    ));
3045                }
3046            }
3047        }
3048
3049        let union_fields = cols
3050            .into_iter()
3051            .map(
3052                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3053                    // If the final number of occurrences of the field is less
3054                    // than the number of inputs (i.e. the field is missing from
3055                    // one or more inputs), then it must be treated as nullable.
3056                    let final_is_nullable = if occurrences == inputs.len() {
3057                        is_nullable
3058                    } else {
3059                        true
3060                    };
3061
3062                    let mut field =
3063                        Field::new(name, data_type.clone(), final_is_nullable);
3064                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3065
3066                    (None, Arc::new(field))
3067                },
3068            )
3069            .collect::<Vec<(Option<TableReference>, _)>>();
3070
3071        let union_schema_metadata = intersect_metadata_for_union(
3072            inputs.iter().map(|input| input.schema().metadata()),
3073        );
3074
3075        // Functional Dependencies are not preserved after UNION operation
3076        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3077        let schema = Arc::new(schema);
3078
3079        Ok(schema)
3080    }
3081
3082    fn derive_schema_from_inputs_by_position(
3083        inputs: &[Arc<LogicalPlan>],
3084        loose_types: bool,
3085    ) -> Result<DFSchemaRef> {
3086        let first_schema = inputs[0].schema();
3087        let fields_count = first_schema.fields().len();
3088        for input in inputs.iter().skip(1) {
3089            if fields_count != input.schema().fields().len() {
3090                return plan_err!(
3091                    "UNION queries have different number of columns: \
3092                    left has {} columns whereas right has {} columns",
3093                    fields_count,
3094                    input.schema().fields().len()
3095                );
3096            }
3097        }
3098
3099        let mut name_counts: HashMap<String, usize> = HashMap::new();
3100        let union_fields = (0..fields_count)
3101            .map(|i| {
3102                let fields = inputs
3103                    .iter()
3104                    .map(|input| input.schema().field(i))
3105                    .collect::<Vec<_>>();
3106                let first_field = fields[0];
3107                let base_name = first_field.name().to_string();
3108
3109                let data_type = if loose_types {
3110                    // TODO apply type coercion here, or document why it's better to defer
3111                    // temporarily use the data type from the left input and later rely on the analyzer to
3112                    // coerce the two schemas into a common one.
3113                    first_field.data_type()
3114                } else {
3115                    fields.iter().skip(1).try_fold(
3116                        first_field.data_type(),
3117                        |acc, field| {
3118                            if acc != field.data_type() {
3119                                return plan_err!(
3120                                    "UNION field {i} have different type in inputs: \
3121                                    left has {} whereas right has {}",
3122                                    first_field.data_type(),
3123                                    field.data_type()
3124                                );
3125                            }
3126                            Ok(acc)
3127                        },
3128                    )?
3129                };
3130                let nullable = fields.iter().any(|field| field.is_nullable());
3131
3132                // Generate unique field name
3133                let name = if let Some(count) = name_counts.get_mut(&base_name) {
3134                    *count += 1;
3135                    format!("{base_name}_{count}")
3136                } else {
3137                    name_counts.insert(base_name.clone(), 0);
3138                    base_name
3139                };
3140
3141                let mut field = Field::new(&name, data_type.clone(), nullable);
3142                let field_metadata = intersect_metadata_for_union(
3143                    fields.iter().map(|field| field.metadata()),
3144                );
3145                field.set_metadata(field_metadata);
3146                Ok((None, Arc::new(field)))
3147            })
3148            .collect::<Result<_>>()?;
3149        let union_schema_metadata = intersect_metadata_for_union(
3150            inputs.iter().map(|input| input.schema().metadata()),
3151        );
3152
3153        // Functional Dependencies are not preserved after UNION operation
3154        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3155        let schema = Arc::new(schema);
3156
3157        Ok(schema)
3158    }
3159}
3160
3161// Manual implementation needed because of `schema` field. Comparison excludes this field.
3162impl PartialOrd for Union {
3163    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3164        self.inputs
3165            .partial_cmp(&other.inputs)
3166            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3167            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3168    }
3169}
3170
3171/// Describe the schema of table
3172///
3173/// # Example output:
3174///
3175/// ```sql
3176/// > describe traces;
3177/// +--------------------+-----------------------------+-------------+
3178/// | column_name        | data_type                   | is_nullable |
3179/// +--------------------+-----------------------------+-------------+
3180/// | attributes         | Utf8                        | YES         |
3181/// | duration_nano      | Int64                       | YES         |
3182/// | end_time_unix_nano | Int64                       | YES         |
3183/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3184/// | span.kind          | Utf8                        | YES         |
3185/// | span.name          | Utf8                        | YES         |
3186/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3187/// | time               | Timestamp(Nanosecond, None) | NO          |
3188/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3189/// | otel.status_code   | Utf8                        | YES         |
3190/// | parent_span_id     | Utf8                        | YES         |
3191/// +--------------------+-----------------------------+-------------+
3192/// ```
3193#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3194pub struct DescribeTable {
3195    /// Table schema
3196    pub schema: Arc<Schema>,
3197    /// schema of describe table output
3198    pub output_schema: DFSchemaRef,
3199}
3200
3201// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3202// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3203impl PartialOrd for DescribeTable {
3204    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3205        // There is no relevant comparison for schemas
3206        None
3207    }
3208}
3209
3210/// Options for EXPLAIN
3211#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3212pub struct ExplainOption {
3213    /// Include detailed debug info
3214    pub verbose: bool,
3215    /// Actually execute the plan and report metrics
3216    pub analyze: bool,
3217    /// Output syntax/format
3218    pub format: ExplainFormat,
3219}
3220
3221impl Default for ExplainOption {
3222    fn default() -> Self {
3223        ExplainOption {
3224            verbose: false,
3225            analyze: false,
3226            format: ExplainFormat::Indent,
3227        }
3228    }
3229}
3230
3231impl ExplainOption {
3232    /// Builder‐style setter for `verbose`
3233    pub fn with_verbose(mut self, verbose: bool) -> Self {
3234        self.verbose = verbose;
3235        self
3236    }
3237
3238    /// Builder‐style setter for `analyze`
3239    pub fn with_analyze(mut self, analyze: bool) -> Self {
3240        self.analyze = analyze;
3241        self
3242    }
3243
3244    /// Builder‐style setter for `format`
3245    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3246        self.format = format;
3247        self
3248    }
3249}
3250
3251/// Produces a relation with string representations of
3252/// various parts of the plan
3253///
3254/// See [the documentation] for more information
3255///
3256/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3257#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3258pub struct Explain {
3259    /// Should extra (detailed, intermediate plans) be included?
3260    pub verbose: bool,
3261    /// Output format for explain, if specified.
3262    /// If none, defaults to `text`
3263    pub explain_format: ExplainFormat,
3264    /// The logical plan that is being EXPLAIN'd
3265    pub plan: Arc<LogicalPlan>,
3266    /// Represent the various stages plans have gone through
3267    pub stringified_plans: Vec<StringifiedPlan>,
3268    /// The output schema of the explain (2 columns of text)
3269    pub schema: DFSchemaRef,
3270    /// Used by physical planner to check if should proceed with planning
3271    pub logical_optimization_succeeded: bool,
3272}
3273
3274// Manual implementation needed because of `schema` field. Comparison excludes this field.
3275impl PartialOrd for Explain {
3276    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3277        #[derive(PartialEq, PartialOrd)]
3278        struct ComparableExplain<'a> {
3279            /// Should extra (detailed, intermediate plans) be included?
3280            pub verbose: &'a bool,
3281            /// The logical plan that is being EXPLAIN'd
3282            pub plan: &'a Arc<LogicalPlan>,
3283            /// Represent the various stages plans have gone through
3284            pub stringified_plans: &'a Vec<StringifiedPlan>,
3285            /// Used by physical planner to check if should proceed with planning
3286            pub logical_optimization_succeeded: &'a bool,
3287        }
3288        let comparable_self = ComparableExplain {
3289            verbose: &self.verbose,
3290            plan: &self.plan,
3291            stringified_plans: &self.stringified_plans,
3292            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3293        };
3294        let comparable_other = ComparableExplain {
3295            verbose: &other.verbose,
3296            plan: &other.plan,
3297            stringified_plans: &other.stringified_plans,
3298            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3299        };
3300        comparable_self
3301            .partial_cmp(&comparable_other)
3302            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3303            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3304    }
3305}
3306
3307/// Runs the actual plan, and then prints the physical plan with
3308/// with execution metrics.
3309#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3310pub struct Analyze {
3311    /// Should extra detail be included?
3312    pub verbose: bool,
3313    /// The logical plan that is being EXPLAIN ANALYZE'd
3314    pub input: Arc<LogicalPlan>,
3315    /// The output schema of the explain (2 columns of text)
3316    pub schema: DFSchemaRef,
3317}
3318
3319// Manual implementation needed because of `schema` field. Comparison excludes this field.
3320impl PartialOrd for Analyze {
3321    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3322        match self.verbose.partial_cmp(&other.verbose) {
3323            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3324            cmp => cmp,
3325        }
3326        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3327        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3328    }
3329}
3330
3331/// Extension operator defined outside of DataFusion
3332// TODO(clippy): This clippy `allow` should be removed if
3333// the manual `PartialEq` is removed in favor of a derive.
3334// (see `PartialEq` the impl for details.)
3335#[allow(clippy::allow_attributes)]
3336#[allow(clippy::derived_hash_with_manual_eq)]
3337#[derive(Debug, Clone, Eq, Hash)]
3338pub struct Extension {
3339    /// The runtime extension operator
3340    pub node: Arc<dyn UserDefinedLogicalNode>,
3341}
3342
3343// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3344// This manual implementation should be removed if
3345// https://github.com/rust-lang/rust/issues/39128 is fixed.
3346impl PartialEq for Extension {
3347    fn eq(&self, other: &Self) -> bool {
3348        self.node.eq(&other.node)
3349    }
3350}
3351
3352impl PartialOrd for Extension {
3353    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3354        self.node.partial_cmp(&other.node)
3355    }
3356}
3357
3358/// Produces the first `n` tuples from its input and discards the rest.
3359#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3360pub struct Limit {
3361    /// Number of rows to skip before fetch
3362    pub skip: Option<Box<Expr>>,
3363    /// Maximum number of rows to fetch,
3364    /// None means fetching all rows
3365    pub fetch: Option<Box<Expr>>,
3366    /// The logical plan
3367    pub input: Arc<LogicalPlan>,
3368}
3369
3370/// Different types of skip expression in Limit plan.
3371pub enum SkipType {
3372    /// The skip expression is a literal value.
3373    Literal(usize),
3374    /// Currently only supports expressions that can be folded into constants.
3375    UnsupportedExpr,
3376}
3377
3378/// Different types of fetch expression in Limit plan.
3379pub enum FetchType {
3380    /// The fetch expression is a literal value.
3381    /// `Literal(None)` means the fetch expression is not provided.
3382    Literal(Option<usize>),
3383    /// Currently only supports expressions that can be folded into constants.
3384    UnsupportedExpr,
3385}
3386
3387impl Limit {
3388    /// Get the skip type from the limit plan.
3389    pub fn get_skip_type(&self) -> Result<SkipType> {
3390        match self.skip.as_deref() {
3391            Some(expr) => match *expr {
3392                Expr::Literal(ScalarValue::Int64(s), _) => {
3393                    // `skip = NULL` is equivalent to `skip = 0`
3394                    let s = s.unwrap_or(0);
3395                    if s >= 0 {
3396                        Ok(SkipType::Literal(s as usize))
3397                    } else {
3398                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3399                    }
3400                }
3401                _ => Ok(SkipType::UnsupportedExpr),
3402            },
3403            // `skip = None` is equivalent to `skip = 0`
3404            None => Ok(SkipType::Literal(0)),
3405        }
3406    }
3407
3408    /// Get the fetch type from the limit plan.
3409    pub fn get_fetch_type(&self) -> Result<FetchType> {
3410        match self.fetch.as_deref() {
3411            Some(expr) => match *expr {
3412                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3413                    if s >= 0 {
3414                        Ok(FetchType::Literal(Some(s as usize)))
3415                    } else {
3416                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3417                    }
3418                }
3419                Expr::Literal(ScalarValue::Int64(None), _) => {
3420                    Ok(FetchType::Literal(None))
3421                }
3422                _ => Ok(FetchType::UnsupportedExpr),
3423            },
3424            None => Ok(FetchType::Literal(None)),
3425        }
3426    }
3427}
3428
3429/// Removes duplicate rows from the input
3430#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3431pub enum Distinct {
3432    /// Plain `DISTINCT` referencing all selection expressions
3433    All(Arc<LogicalPlan>),
3434    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3435    On(DistinctOn),
3436}
3437
3438impl Distinct {
3439    /// return a reference to the nodes input
3440    pub fn input(&self) -> &Arc<LogicalPlan> {
3441        match self {
3442            Distinct::All(input) => input,
3443            Distinct::On(DistinctOn { input, .. }) => input,
3444        }
3445    }
3446}
3447
3448/// Removes duplicate rows from the input
3449#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3450pub struct DistinctOn {
3451    /// The `DISTINCT ON` clause expression list
3452    pub on_expr: Vec<Expr>,
3453    /// The selected projection expression list
3454    pub select_expr: Vec<Expr>,
3455    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3456    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3457    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3458    pub sort_expr: Option<Vec<SortExpr>>,
3459    /// The logical plan that is being DISTINCT'd
3460    pub input: Arc<LogicalPlan>,
3461    /// The schema description of the DISTINCT ON output
3462    pub schema: DFSchemaRef,
3463}
3464
3465impl DistinctOn {
3466    /// Create a new `DistinctOn` struct.
3467    pub fn try_new(
3468        on_expr: Vec<Expr>,
3469        select_expr: Vec<Expr>,
3470        sort_expr: Option<Vec<SortExpr>>,
3471        input: Arc<LogicalPlan>,
3472    ) -> Result<Self> {
3473        if on_expr.is_empty() {
3474            return plan_err!("No `ON` expressions provided");
3475        }
3476
3477        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3478        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3479            .into_iter()
3480            .collect();
3481
3482        let dfschema = DFSchema::new_with_metadata(
3483            qualified_fields,
3484            input.schema().metadata().clone(),
3485        )?;
3486
3487        let mut distinct_on = DistinctOn {
3488            on_expr,
3489            select_expr,
3490            sort_expr: None,
3491            input,
3492            schema: Arc::new(dfschema),
3493        };
3494
3495        if let Some(sort_expr) = sort_expr {
3496            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3497        }
3498
3499        Ok(distinct_on)
3500    }
3501
3502    /// Try to update `self` with a new sort expressions.
3503    ///
3504    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3505    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3506        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3507
3508        // Check that the left-most sort expressions are the same as the `ON` expressions.
3509        let mut matched = true;
3510        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3511            if on != &sort.expr {
3512                matched = false;
3513                break;
3514            }
3515        }
3516
3517        if self.on_expr.len() > sort_expr.len() || !matched {
3518            return plan_err!(
3519                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3520            );
3521        }
3522
3523        self.sort_expr = Some(sort_expr);
3524        Ok(self)
3525    }
3526}
3527
3528// Manual implementation needed because of `schema` field. Comparison excludes this field.
3529impl PartialOrd for DistinctOn {
3530    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3531        #[derive(PartialEq, PartialOrd)]
3532        struct ComparableDistinctOn<'a> {
3533            /// The `DISTINCT ON` clause expression list
3534            pub on_expr: &'a Vec<Expr>,
3535            /// The selected projection expression list
3536            pub select_expr: &'a Vec<Expr>,
3537            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3538            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3539            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3540            pub sort_expr: &'a Option<Vec<SortExpr>>,
3541            /// The logical plan that is being DISTINCT'd
3542            pub input: &'a Arc<LogicalPlan>,
3543        }
3544        let comparable_self = ComparableDistinctOn {
3545            on_expr: &self.on_expr,
3546            select_expr: &self.select_expr,
3547            sort_expr: &self.sort_expr,
3548            input: &self.input,
3549        };
3550        let comparable_other = ComparableDistinctOn {
3551            on_expr: &other.on_expr,
3552            select_expr: &other.select_expr,
3553            sort_expr: &other.sort_expr,
3554            input: &other.input,
3555        };
3556        comparable_self
3557            .partial_cmp(&comparable_other)
3558            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3559            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3560    }
3561}
3562
3563/// Aggregates its input based on a set of grouping and aggregate
3564/// expressions (e.g. SUM).
3565///
3566/// # Output Schema
3567///
3568/// The output schema is the group expressions followed by the aggregate
3569/// expressions in order.
3570///
3571/// For example, given the input schema `"A", "B", "C"` and the aggregate
3572/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3573/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3574/// single new column
3575#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3576// mark non_exhaustive to encourage use of try_new/new()
3577#[non_exhaustive]
3578pub struct Aggregate {
3579    /// The incoming logical plan
3580    pub input: Arc<LogicalPlan>,
3581    /// Grouping expressions
3582    pub group_expr: Vec<Expr>,
3583    /// Aggregate expressions.
3584    ///
3585    /// Note these *must* be either [`Expr::AggregateFunction`] or [`Expr::Alias`]
3586    pub aggr_expr: Vec<Expr>,
3587    /// The schema description of the aggregate output
3588    pub schema: DFSchemaRef,
3589}
3590
3591impl Aggregate {
3592    /// Create a new aggregate operator.
3593    pub fn try_new(
3594        input: Arc<LogicalPlan>,
3595        group_expr: Vec<Expr>,
3596        aggr_expr: Vec<Expr>,
3597    ) -> Result<Self> {
3598        let group_expr = enumerate_grouping_sets(group_expr)?;
3599
3600        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3601
3602        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3603
3604        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3605
3606        // Even columns that cannot be null will become nullable when used in a grouping set.
3607        if is_grouping_set {
3608            qualified_fields = qualified_fields
3609                .into_iter()
3610                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3611                .collect::<Vec<_>>();
3612            let max_ordinal = max_grouping_set_duplicate_ordinal(&group_expr);
3613            qualified_fields.push((
3614                None,
3615                Field::new(
3616                    Self::INTERNAL_GROUPING_ID,
3617                    Self::grouping_id_type(qualified_fields.len(), max_ordinal),
3618                    false,
3619                )
3620                .into(),
3621            ));
3622        }
3623
3624        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3625
3626        let schema = DFSchema::new_with_metadata(
3627            qualified_fields,
3628            input.schema().metadata().clone(),
3629        )?;
3630
3631        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3632    }
3633
3634    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3635    /// building the schema again when the schema is already known.
3636    ///
3637    /// This method should only be called when you are absolutely sure that the schema being
3638    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3639    pub fn try_new_with_schema(
3640        input: Arc<LogicalPlan>,
3641        group_expr: Vec<Expr>,
3642        aggr_expr: Vec<Expr>,
3643        schema: DFSchemaRef,
3644    ) -> Result<Self> {
3645        if group_expr.is_empty() && aggr_expr.is_empty() {
3646            return plan_err!(
3647                "Aggregate requires at least one grouping or aggregate expression. \
3648                Aggregate without grouping expressions nor aggregate expressions is \
3649                logically equivalent to, but less efficient than, VALUES producing \
3650                single row. Please use VALUES instead."
3651            );
3652        }
3653        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3654        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3655            return plan_err!(
3656                "Aggregate schema has wrong number of fields. Expected {} got {}",
3657                group_expr_count + aggr_expr.len(),
3658                schema.fields().len()
3659            );
3660        }
3661
3662        let aggregate_func_dependencies =
3663            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3664        let new_schema = Arc::unwrap_or_clone(schema);
3665        let schema = Arc::new(
3666            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3667        );
3668        Ok(Self {
3669            input,
3670            group_expr,
3671            aggr_expr,
3672            schema,
3673        })
3674    }
3675
3676    fn is_grouping_set(&self) -> bool {
3677        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3678    }
3679
3680    /// Get the output expressions.
3681    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3682        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3683            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3684        });
3685        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3686        if self.is_grouping_set() {
3687            exprs.push(&INTERNAL_ID_EXPR);
3688        }
3689        exprs.extend(self.aggr_expr.iter());
3690        debug_assert!(exprs.len() == self.schema.fields().len());
3691        Ok(exprs)
3692    }
3693
3694    /// Get the length of the group by expression in the output schema
3695    /// This is not simply group by expression length. Expression may be
3696    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3697    pub fn group_expr_len(&self) -> Result<usize> {
3698        grouping_set_expr_count(&self.group_expr)
3699    }
3700
3701    /// Returns the data type of the grouping id.
3702    ///
3703    /// The grouping ID packs two pieces of information into a single integer:
3704    /// - The low `group_exprs` bits are the semantic bitmask (a set bit means the
3705    ///   corresponding grouping expression is NULL for this grouping set).
3706    /// - The bits above position `group_exprs` encode a duplicate ordinal that
3707    ///   distinguishes multiple occurrences of the same grouping set pattern.
3708    ///
3709    /// `max_ordinal` is the highest ordinal value that will appear (0 when there
3710    /// are no duplicate grouping sets).  The type is chosen to be the smallest
3711    /// unsigned integer that can represent both parts.
3712    pub fn grouping_id_type(group_exprs: usize, max_ordinal: usize) -> DataType {
3713        let ordinal_bits = usize::BITS as usize - max_ordinal.leading_zeros() as usize;
3714        let total_bits = group_exprs + ordinal_bits;
3715        if total_bits <= 8 {
3716            DataType::UInt8
3717        } else if total_bits <= 16 {
3718            DataType::UInt16
3719        } else if total_bits <= 32 {
3720            DataType::UInt32
3721        } else {
3722            DataType::UInt64
3723        }
3724    }
3725
3726    /// Internal column used when the aggregation is a grouping set.
3727    ///
3728    /// This column packs two values into a single unsigned integer:
3729    ///
3730    /// - **Low bits (positions 0 .. n-1)**: a semantic bitmask where each bit
3731    ///   represents one of the `n` grouping expressions.  The least significant
3732    ///   bit corresponds to the rightmost grouping expression.  A `1` bit means
3733    ///   the corresponding column is replaced with `NULL` for this grouping set;
3734    ///   a `0` bit means it is included.
3735    /// - **High bits (positions n and above)**: a *duplicate ordinal* that
3736    ///   distinguishes multiple occurrences of the same semantic grouping set
3737    ///   pattern within a single query.  The ordinal is `0` for the first
3738    ///   occurrence, `1` for the second, and so on.
3739    ///
3740    /// The integer type is chosen by [`Self::grouping_id_type`] to be the
3741    /// smallest `UInt8 / UInt16 / UInt32 / UInt64` that can represent both
3742    /// parts.
3743    ///
3744    /// For example, for the grouping expressions CUBE(a, b) (no duplicates),
3745    /// the grouping ID column will have the following values:
3746    ///     0b00: Both `a` and `b` are included
3747    ///     0b01: `b` is excluded
3748    ///     0b10: `a` is excluded
3749    ///     0b11: Both `a` and `b` are excluded
3750    ///
3751    /// When the same set appears twice and `n = 2`, the duplicate ordinal is
3752    /// packed into bit 2:
3753    ///     first occurrence:  `0b0_01` (ordinal = 0, mask = 0b01)
3754    ///     second occurrence: `0b1_01` (ordinal = 1, mask = 0b01)
3755    ///
3756    /// The GROUPING function always masks the value with `(1 << n) - 1` before
3757    /// interpreting it so the ordinal bits are invisible to user-facing SQL.
3758    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3759}
3760
3761// Manual implementation needed because of `schema` field. Comparison excludes this field.
3762impl PartialOrd for Aggregate {
3763    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3764        match self.input.partial_cmp(&other.input) {
3765            Some(Ordering::Equal) => {
3766                match self.group_expr.partial_cmp(&other.group_expr) {
3767                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3768                    cmp => cmp,
3769                }
3770            }
3771            cmp => cmp,
3772        }
3773        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3774        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3775    }
3776}
3777
3778/// Returns the highest duplicate ordinal across all grouping sets in `group_expr`.
3779///
3780/// The ordinal for each occurrence of a grouping set pattern is its 0-based
3781/// index among identical entries. For example, if the same set appears three
3782/// times, the ordinals are 0, 1, 2 and this function returns 2.
3783/// Returns 0 when no grouping set is duplicated.
3784#[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Expr contains Arc with interior mutability but is intentionally used as hash key
3785fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
3786    if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
3787        let mut counts: HashMap<&[Expr], usize> = HashMap::new();
3788        for set in sets {
3789            *counts.entry(set).or_insert(0) += 1;
3790        }
3791        counts.into_values().max().unwrap_or(0).saturating_sub(1)
3792    } else {
3793        0
3794    }
3795}
3796
3797/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3798fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3799    group_expr
3800        .iter()
3801        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3802}
3803
3804/// Calculates functional dependencies for aggregate expressions.
3805fn calc_func_dependencies_for_aggregate(
3806    // Expressions in the GROUP BY clause:
3807    group_expr: &[Expr],
3808    // Input plan of the aggregate:
3809    input: &LogicalPlan,
3810    // Aggregate schema
3811    aggr_schema: &DFSchema,
3812) -> Result<FunctionalDependencies> {
3813    // We can do a case analysis on how to propagate functional dependencies based on
3814    // whether the GROUP BY in question contains a grouping set expression:
3815    // - If so, the functional dependencies will be empty because we cannot guarantee
3816    //   that GROUP BY expression results will be unique.
3817    // - Otherwise, it may be possible to propagate functional dependencies.
3818    if !contains_grouping_set(group_expr) {
3819        let group_by_expr_names = group_expr
3820            .iter()
3821            .map(|item| item.schema_name().to_string())
3822            .collect::<IndexSet<_>>()
3823            .into_iter()
3824            .collect::<Vec<_>>();
3825        let aggregate_func_dependencies = aggregate_functional_dependencies(
3826            input.schema(),
3827            &group_by_expr_names,
3828            aggr_schema,
3829        );
3830        Ok(aggregate_func_dependencies)
3831    } else {
3832        Ok(FunctionalDependencies::empty())
3833    }
3834}
3835
3836/// This function projects functional dependencies of the `input` plan according
3837/// to projection expressions `exprs`.
3838fn calc_func_dependencies_for_project(
3839    exprs: &[Expr],
3840    input: &LogicalPlan,
3841) -> Result<FunctionalDependencies> {
3842    let input_fields = input.schema().field_names();
3843    // Calculate expression indices (if present) in the input schema.
3844    let proj_indices = exprs
3845        .iter()
3846        .map(|expr| match expr {
3847            #[expect(deprecated)]
3848            Expr::Wildcard { qualifier, options } => {
3849                let wildcard_fields = exprlist_to_fields(
3850                    vec![&Expr::Wildcard {
3851                        qualifier: qualifier.clone(),
3852                        options: options.clone(),
3853                    }],
3854                    input,
3855                )?;
3856                Ok::<_, DataFusionError>(
3857                    wildcard_fields
3858                        .into_iter()
3859                        .filter_map(|(qualifier, f)| {
3860                            let flat_name = qualifier
3861                                .map(|t| format!("{}.{}", t, f.name()))
3862                                .unwrap_or_else(|| f.name().clone());
3863                            input_fields.iter().position(|item| *item == flat_name)
3864                        })
3865                        .collect::<Vec<_>>(),
3866                )
3867            }
3868            Expr::Alias(alias) => {
3869                let name = format!("{}", alias.expr);
3870                Ok(input_fields
3871                    .iter()
3872                    .position(|item| *item == name)
3873                    .map(|i| vec![i])
3874                    .unwrap_or(vec![]))
3875            }
3876            _ => {
3877                let name = format!("{expr}");
3878                Ok(input_fields
3879                    .iter()
3880                    .position(|item| *item == name)
3881                    .map(|i| vec![i])
3882                    .unwrap_or(vec![]))
3883            }
3884        })
3885        .collect::<Result<Vec<_>>>()?
3886        .into_iter()
3887        .flatten()
3888        .collect::<Vec<_>>();
3889
3890    Ok(input
3891        .schema()
3892        .functional_dependencies()
3893        .project_functional_dependencies(&proj_indices, exprs.len()))
3894}
3895
3896/// Sorts its input according to a list of sort expressions.
3897#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3898pub struct Sort {
3899    /// The sort expressions
3900    pub expr: Vec<SortExpr>,
3901    /// The incoming logical plan
3902    pub input: Arc<LogicalPlan>,
3903    /// Optional fetch limit
3904    pub fetch: Option<usize>,
3905}
3906
3907/// Join two logical plans on one or more join columns
3908#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3909pub struct Join {
3910    /// Left input
3911    pub left: Arc<LogicalPlan>,
3912    /// Right input
3913    pub right: Arc<LogicalPlan>,
3914    /// Equijoin clause expressed as pairs of (left, right) join expressions
3915    pub on: Vec<(Expr, Expr)>,
3916    /// Filters applied during join (non-equi conditions)
3917    pub filter: Option<Expr>,
3918    /// Join type
3919    pub join_type: JoinType,
3920    /// Join constraint
3921    pub join_constraint: JoinConstraint,
3922    /// The output schema, containing fields from the left and right inputs
3923    pub schema: DFSchemaRef,
3924    /// Defines the null equality for the join.
3925    pub null_equality: NullEquality,
3926    /// Whether this is a null-aware anti join (for NOT IN semantics).
3927    ///
3928    /// Only applies to LeftAnti joins. When true, implements SQL NOT IN semantics where:
3929    /// - If the right side (subquery) contains any NULL in join keys, no rows are output
3930    /// - Left side rows with NULL in join keys are not output
3931    ///
3932    /// This is required for correct NOT IN subquery behavior with three-valued logic.
3933    pub null_aware: bool,
3934}
3935
3936impl Join {
3937    /// Creates a new Join operator with automatically computed schema.
3938    ///
3939    /// This constructor computes the schema based on the join type and inputs,
3940    /// removing the need to manually specify the schema or call `recompute_schema`.
3941    ///
3942    /// # Arguments
3943    ///
3944    /// * `left` - Left input plan
3945    /// * `right` - Right input plan
3946    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3947    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3948    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3949    /// * `join_constraint` - Join constraint (On, Using)
3950    /// * `null_equality` - How to handle nulls in join comparisons
3951    /// * `null_aware` - Whether this is a null-aware anti join (for NOT IN semantics)
3952    ///
3953    /// # Returns
3954    ///
3955    /// A new Join operator with the computed schema
3956    #[expect(clippy::too_many_arguments)]
3957    pub fn try_new(
3958        left: Arc<LogicalPlan>,
3959        right: Arc<LogicalPlan>,
3960        on: Vec<(Expr, Expr)>,
3961        filter: Option<Expr>,
3962        join_type: JoinType,
3963        join_constraint: JoinConstraint,
3964        null_equality: NullEquality,
3965        null_aware: bool,
3966    ) -> Result<Self> {
3967        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3968
3969        Ok(Join {
3970            left,
3971            right,
3972            on,
3973            filter,
3974            join_type,
3975            join_constraint,
3976            schema: Arc::new(join_schema),
3977            null_equality,
3978            null_aware,
3979        })
3980    }
3981
3982    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3983    /// create the physical join.
3984    pub fn try_new_with_project_input(
3985        original: &LogicalPlan,
3986        left: Arc<LogicalPlan>,
3987        right: Arc<LogicalPlan>,
3988        column_on: (Vec<Column>, Vec<Column>),
3989    ) -> Result<(Self, bool)> {
3990        let original_join = match original {
3991            LogicalPlan::Join(join) => join,
3992            _ => return plan_err!("Could not create join with project input"),
3993        };
3994
3995        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3996        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3997
3998        let mut requalified = false;
3999
4000        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
4001        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
4002        if original_join.join_type == JoinType::Inner
4003            || original_join.join_type == JoinType::Left
4004            || original_join.join_type == JoinType::Right
4005            || original_join.join_type == JoinType::Full
4006        {
4007            (left_sch, right_sch, requalified) =
4008                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
4009        }
4010
4011        let on: Vec<(Expr, Expr)> = column_on
4012            .0
4013            .into_iter()
4014            .zip(column_on.1)
4015            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
4016            .collect();
4017
4018        let join_schema = build_join_schema(
4019            left_sch.schema(),
4020            right_sch.schema(),
4021            &original_join.join_type,
4022        )?;
4023
4024        Ok((
4025            Join {
4026                left,
4027                right,
4028                on,
4029                filter: original_join.filter.clone(),
4030                join_type: original_join.join_type,
4031                join_constraint: original_join.join_constraint,
4032                schema: Arc::new(join_schema),
4033                null_equality: original_join.null_equality,
4034                null_aware: original_join.null_aware,
4035            },
4036            requalified,
4037        ))
4038    }
4039}
4040
4041// Manual implementation needed because of `schema` field. Comparison excludes this field.
4042impl PartialOrd for Join {
4043    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4044        #[derive(PartialEq, PartialOrd)]
4045        struct ComparableJoin<'a> {
4046            /// Left input
4047            pub left: &'a Arc<LogicalPlan>,
4048            /// Right input
4049            pub right: &'a Arc<LogicalPlan>,
4050            /// Equijoin clause expressed as pairs of (left, right) join expressions
4051            pub on: &'a Vec<(Expr, Expr)>,
4052            /// Filters applied during join (non-equi conditions)
4053            pub filter: &'a Option<Expr>,
4054            /// Join type
4055            pub join_type: &'a JoinType,
4056            /// Join constraint
4057            pub join_constraint: &'a JoinConstraint,
4058            /// The null handling behavior for equalities
4059            pub null_equality: &'a NullEquality,
4060        }
4061        let comparable_self = ComparableJoin {
4062            left: &self.left,
4063            right: &self.right,
4064            on: &self.on,
4065            filter: &self.filter,
4066            join_type: &self.join_type,
4067            join_constraint: &self.join_constraint,
4068            null_equality: &self.null_equality,
4069        };
4070        let comparable_other = ComparableJoin {
4071            left: &other.left,
4072            right: &other.right,
4073            on: &other.on,
4074            filter: &other.filter,
4075            join_type: &other.join_type,
4076            join_constraint: &other.join_constraint,
4077            null_equality: &other.null_equality,
4078        };
4079        comparable_self
4080            .partial_cmp(&comparable_other)
4081            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4082            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4083    }
4084}
4085
4086/// Subquery
4087#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4088pub struct Subquery {
4089    /// The subquery
4090    pub subquery: Arc<LogicalPlan>,
4091    /// The outer references used in the subquery
4092    pub outer_ref_columns: Vec<Expr>,
4093    /// Span information for subquery projection columns
4094    pub spans: Spans,
4095}
4096
4097impl Normalizeable for Subquery {
4098    fn can_normalize(&self) -> bool {
4099        false
4100    }
4101}
4102
4103impl NormalizeEq for Subquery {
4104    fn normalize_eq(&self, other: &Self) -> bool {
4105        // TODO: may be implement NormalizeEq for LogicalPlan?
4106        *self.subquery == *other.subquery
4107            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4108            && self
4109                .outer_ref_columns
4110                .iter()
4111                .zip(other.outer_ref_columns.iter())
4112                .all(|(a, b)| a.normalize_eq(b))
4113    }
4114}
4115
4116impl Subquery {
4117    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4118        match plan {
4119            Expr::ScalarSubquery(it) => Ok(it),
4120            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4121            _ => plan_err!("Could not coerce into ScalarSubquery!"),
4122        }
4123    }
4124
4125    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4126        Subquery {
4127            subquery: plan,
4128            outer_ref_columns: self.outer_ref_columns.clone(),
4129            spans: Spans::new(),
4130        }
4131    }
4132}
4133
4134impl Debug for Subquery {
4135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4136        write!(f, "<subquery>")
4137    }
4138}
4139
4140/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
4141///
4142/// See [`Partitioning`] for more details on partitioning
4143///
4144/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
4145#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4146pub enum Partitioning {
4147    /// Allocate batches using a round-robin algorithm and the specified number of partitions
4148    RoundRobinBatch(usize),
4149    /// Allocate rows based on a hash of one of more expressions and the specified number
4150    /// of partitions.
4151    Hash(Vec<Expr>, usize),
4152    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
4153    DistributeBy(Vec<Expr>),
4154}
4155
4156/// Represent the unnesting operation on a list column, such as the recursion depth and
4157/// the output column name after unnesting
4158///
4159/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4160///
4161/// ```text
4162///   input             output_name
4163///  ┌─────────┐      ┌─────────┐
4164///  │{{1,2}}  │      │ 1       │
4165///  ├─────────┼─────►├─────────┤
4166///  │{{3}}    │      │ 2       │
4167///  ├─────────┤      ├─────────┤
4168///  │{{4},{5}}│      │ 3       │
4169///  └─────────┘      ├─────────┤
4170///                   │ 4       │
4171///                   ├─────────┤
4172///                   │ 5       │
4173///                   └─────────┘
4174/// ```
4175#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4176pub struct ColumnUnnestList {
4177    pub output_column: Column,
4178    pub depth: usize,
4179}
4180
4181impl Display for ColumnUnnestList {
4182    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4183        write!(f, "{}|depth={}", self.output_column, self.depth)
4184    }
4185}
4186
4187/// Unnest a column that contains a nested list type. See
4188/// [`UnnestOptions`] for more details.
4189#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4190pub struct Unnest {
4191    /// The incoming logical plan
4192    pub input: Arc<LogicalPlan>,
4193    /// Columns to run unnest on, can be a list of (List/Struct) columns
4194    pub exec_columns: Vec<Column>,
4195    /// refer to the indices(in the input schema) of columns
4196    /// that have type list to run unnest on
4197    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4198    /// refer to the indices (in the input schema) of columns
4199    /// that have type struct to run unnest on
4200    pub struct_type_columns: Vec<usize>,
4201    /// Having items aligned with the output columns
4202    /// representing which column in the input schema each output column depends on
4203    pub dependency_indices: Vec<usize>,
4204    /// The output schema, containing the unnested field column.
4205    pub schema: DFSchemaRef,
4206    /// Options
4207    pub options: UnnestOptions,
4208}
4209
4210// Manual implementation needed because of `schema` field. Comparison excludes this field.
4211impl PartialOrd for Unnest {
4212    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4213        #[derive(PartialEq, PartialOrd)]
4214        struct ComparableUnnest<'a> {
4215            /// The incoming logical plan
4216            pub input: &'a Arc<LogicalPlan>,
4217            /// Columns to run unnest on, can be a list of (List/Struct) columns
4218            pub exec_columns: &'a Vec<Column>,
4219            /// refer to the indices(in the input schema) of columns
4220            /// that have type list to run unnest on
4221            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4222            /// refer to the indices (in the input schema) of columns
4223            /// that have type struct to run unnest on
4224            pub struct_type_columns: &'a Vec<usize>,
4225            /// Having items aligned with the output columns
4226            /// representing which column in the input schema each output column depends on
4227            pub dependency_indices: &'a Vec<usize>,
4228            /// Options
4229            pub options: &'a UnnestOptions,
4230        }
4231        let comparable_self = ComparableUnnest {
4232            input: &self.input,
4233            exec_columns: &self.exec_columns,
4234            list_type_columns: &self.list_type_columns,
4235            struct_type_columns: &self.struct_type_columns,
4236            dependency_indices: &self.dependency_indices,
4237            options: &self.options,
4238        };
4239        let comparable_other = ComparableUnnest {
4240            input: &other.input,
4241            exec_columns: &other.exec_columns,
4242            list_type_columns: &other.list_type_columns,
4243            struct_type_columns: &other.struct_type_columns,
4244            dependency_indices: &other.dependency_indices,
4245            options: &other.options,
4246        };
4247        comparable_self
4248            .partial_cmp(&comparable_other)
4249            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4250            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4251    }
4252}
4253
4254impl Unnest {
4255    pub fn try_new(
4256        input: Arc<LogicalPlan>,
4257        exec_columns: Vec<Column>,
4258        options: UnnestOptions,
4259    ) -> Result<Self> {
4260        if exec_columns.is_empty() {
4261            return plan_err!("unnest plan requires at least 1 column to unnest");
4262        }
4263
4264        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4265        let mut struct_columns = vec![];
4266        let indices_to_unnest = exec_columns
4267            .iter()
4268            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4269            .collect::<Result<HashMap<usize, &Column>>>()?;
4270
4271        let input_schema = input.schema();
4272
4273        let mut dependency_indices = vec![];
4274        // Transform input schema into new schema
4275        // Given this comprehensive example
4276        //
4277        // input schema:
4278        // 1.col1_unnest_placeholder: list[list[int]],
4279        // 2.col1: list[list[int]]
4280        // 3.col2: list[int]
4281        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4282        // output schema:
4283        // 1.unnest_col1_depth_2: int
4284        // 2.unnest_col1_depth_1: list[int]
4285        // 3.col1: list[list[int]]
4286        // 4.unnest_col2_depth_1: int
4287        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4288        // the plural.
4289        let fields = input_schema
4290            .iter()
4291            .enumerate()
4292            .map(|(index, (original_qualifier, original_field))| {
4293                match indices_to_unnest.get(&index) {
4294                    Some(column_to_unnest) => {
4295                        let recursions_on_column = options
4296                            .recursions
4297                            .iter()
4298                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4299                            .collect::<Vec<_>>();
4300                        let mut transformed_columns = recursions_on_column
4301                            .iter()
4302                            .map(|r| {
4303                                list_columns.push((
4304                                    index,
4305                                    ColumnUnnestList {
4306                                        output_column: r.output_column.clone(),
4307                                        depth: r.depth,
4308                                    },
4309                                ));
4310                                Ok(get_unnested_columns(
4311                                    &r.output_column.name,
4312                                    original_field.data_type(),
4313                                    r.depth,
4314                                )?
4315                                .into_iter()
4316                                .next()
4317                                .unwrap()) // because unnesting a list column always result into one result
4318                            })
4319                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4320                        if transformed_columns.is_empty() {
4321                            transformed_columns = get_unnested_columns(
4322                                &column_to_unnest.name,
4323                                original_field.data_type(),
4324                                1,
4325                            )?;
4326                            match original_field.data_type() {
4327                                DataType::Struct(_) => {
4328                                    struct_columns.push(index);
4329                                }
4330                                DataType::List(_)
4331                                | DataType::FixedSizeList(_, _)
4332                                | DataType::LargeList(_)
4333                                | DataType::ListView(_)
4334                                | DataType::LargeListView(_) => {
4335                                    list_columns.push((
4336                                        index,
4337                                        ColumnUnnestList {
4338                                            output_column: Column::from_name(
4339                                                &column_to_unnest.name,
4340                                            ),
4341                                            depth: 1,
4342                                        },
4343                                    ));
4344                                }
4345                                _ => {}
4346                            };
4347                        }
4348
4349                        // new columns dependent on the same original index
4350                        dependency_indices.extend(std::iter::repeat_n(
4351                            index,
4352                            transformed_columns.len(),
4353                        ));
4354                        Ok(transformed_columns
4355                            .iter()
4356                            .map(|(col, field)| {
4357                                (col.relation.to_owned(), field.to_owned())
4358                            })
4359                            .collect())
4360                    }
4361                    None => {
4362                        dependency_indices.push(index);
4363                        Ok(vec![(
4364                            original_qualifier.cloned(),
4365                            Arc::clone(original_field),
4366                        )])
4367                    }
4368                }
4369            })
4370            .collect::<Result<Vec<_>>>()?
4371            .into_iter()
4372            .flatten()
4373            .collect::<Vec<_>>();
4374
4375        let metadata = input_schema.metadata().clone();
4376        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4377        // We can use the existing functional dependencies:
4378        let deps = input_schema.functional_dependencies().clone();
4379        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4380
4381        Ok(Unnest {
4382            input,
4383            exec_columns,
4384            list_type_columns: list_columns,
4385            struct_type_columns: struct_columns,
4386            dependency_indices,
4387            schema,
4388            options,
4389        })
4390    }
4391}
4392
4393// Based on data type, either struct or a variant of list
4394// return a set of columns as the result of unnesting
4395// the input columns.
4396// For example, given a column with name "a",
4397// - List(Element) returns ["a"] with data type Element
4398// - Struct(field1, field2) returns ["a.field1","a.field2"]
4399// For list data type, an argument depth is used to specify
4400// the recursion level
4401fn get_unnested_columns(
4402    col_name: &String,
4403    data_type: &DataType,
4404    depth: usize,
4405) -> Result<Vec<(Column, Arc<Field>)>> {
4406    let mut qualified_columns = Vec::with_capacity(1);
4407
4408    match data_type {
4409        DataType::List(_)
4410        | DataType::FixedSizeList(_, _)
4411        | DataType::LargeList(_)
4412        | DataType::ListView(_)
4413        | DataType::LargeListView(_) => {
4414            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4415            let new_field = Arc::new(Field::new(
4416                col_name, data_type,
4417                // Unnesting may produce NULLs even if the list is not null.
4418                // For example: unnest([1], []) -> 1, null
4419                true,
4420            ));
4421            let column = Column::from_name(col_name);
4422            // let column = Column::from((None, &new_field));
4423            qualified_columns.push((column, new_field));
4424        }
4425        DataType::Struct(fields) => {
4426            qualified_columns.extend(fields.iter().map(|f| {
4427                let new_name = format!("{}.{}", col_name, f.name());
4428                let column = Column::from_name(&new_name);
4429                let new_field = f.as_ref().clone().with_name(new_name);
4430                // let column = Column::from((None, &f));
4431                (column, Arc::new(new_field))
4432            }))
4433        }
4434        _ => {
4435            return internal_err!("trying to unnest on invalid data type {data_type}");
4436        }
4437    };
4438    Ok(qualified_columns)
4439}
4440
4441// Get the data type of a multi-dimensional type after unnesting it
4442// with a given depth
4443fn get_unnested_list_datatype_recursive(
4444    data_type: &DataType,
4445    depth: usize,
4446) -> Result<DataType> {
4447    match data_type {
4448        DataType::List(field)
4449        | DataType::FixedSizeList(field, _)
4450        | DataType::LargeList(field)
4451        | DataType::ListView(field)
4452        | DataType::LargeListView(field) => {
4453            if depth == 1 {
4454                return Ok(field.data_type().clone());
4455            }
4456            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4457        }
4458        _ => {}
4459    };
4460
4461    internal_err!("trying to unnest on invalid data type {data_type}")
4462}
4463
4464#[cfg(test)]
4465mod tests {
4466    use super::*;
4467    use crate::builder::LogicalTableSource;
4468    use crate::logical_plan::table_scan;
4469    use crate::select_expr::SelectExpr;
4470    use crate::test::function_stub::{count, count_udaf};
4471    use crate::{
4472        GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4473        scalar_subquery,
4474    };
4475    use datafusion_common::metadata::ScalarAndMetadata;
4476    use datafusion_common::tree_node::{
4477        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4478    };
4479    use datafusion_common::{Constraint, not_impl_err};
4480    use insta::{assert_debug_snapshot, assert_snapshot};
4481    use std::hash::DefaultHasher;
4482
4483    fn employee_schema() -> Schema {
4484        Schema::new(vec![
4485            Field::new("id", DataType::Int32, false),
4486            Field::new("first_name", DataType::Utf8, false),
4487            Field::new("last_name", DataType::Utf8, false),
4488            Field::new("state", DataType::Utf8, false),
4489            Field::new("salary", DataType::Int32, false),
4490        ])
4491    }
4492
4493    fn display_plan() -> Result<LogicalPlan> {
4494        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4495            .build()?;
4496
4497        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4498            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4499            .project(vec![col("id")])?
4500            .build()
4501    }
4502
4503    #[test]
4504    fn test_display_indent() -> Result<()> {
4505        let plan = display_plan()?;
4506
4507        assert_snapshot!(plan.display_indent(), @r"
4508        Projection: employee_csv.id
4509          Filter: employee_csv.state IN (<subquery>)
4510            Subquery:
4511              TableScan: employee_csv projection=[state]
4512            TableScan: employee_csv projection=[id, state]
4513        ");
4514        Ok(())
4515    }
4516
4517    #[test]
4518    fn test_display_indent_schema() -> Result<()> {
4519        let plan = display_plan()?;
4520
4521        assert_snapshot!(plan.display_indent_schema(), @r"
4522        Projection: employee_csv.id [id:Int32]
4523          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4524            Subquery: [state:Utf8]
4525              TableScan: employee_csv projection=[state] [state:Utf8]
4526            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4527        ");
4528        Ok(())
4529    }
4530
4531    #[test]
4532    fn test_display_subquery_alias() -> Result<()> {
4533        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4534            .build()?;
4535        let plan1 = Arc::new(plan1);
4536
4537        let plan =
4538            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4539                .project(vec![col("id"), exists(plan1).alias("exists")])?
4540                .build();
4541
4542        assert_snapshot!(plan?.display_indent(), @r"
4543        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4544          Subquery:
4545            TableScan: employee_csv projection=[state]
4546          TableScan: employee_csv projection=[id, state]
4547        ");
4548        Ok(())
4549    }
4550
4551    #[test]
4552    fn test_display_graphviz() -> Result<()> {
4553        let plan = display_plan()?;
4554
4555        // just test for a few key lines in the output rather than the
4556        // whole thing to make test maintenance easier.
4557        assert_snapshot!(plan.display_graphviz(), @r#"
4558        // Begin DataFusion GraphViz Plan,
4559        // display it online here: https://dreampuf.github.io/GraphvizOnline
4560
4561        digraph {
4562          subgraph cluster_1
4563          {
4564            graph[label="LogicalPlan"]
4565            2[shape=box label="Projection: employee_csv.id"]
4566            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4567            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4568            4[shape=box label="Subquery:"]
4569            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4570            5[shape=box label="TableScan: employee_csv projection=[state]"]
4571            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4572            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4573            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4574          }
4575          subgraph cluster_7
4576          {
4577            graph[label="Detailed LogicalPlan"]
4578            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4579            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4580            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4581            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4582            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4583            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4584            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4585            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4586            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4587          }
4588        }
4589        // End DataFusion GraphViz Plan
4590        "#);
4591        Ok(())
4592    }
4593
4594    #[test]
4595    fn test_display_pg_json() -> Result<()> {
4596        let plan = display_plan()?;
4597
4598        assert_snapshot!(plan.display_pg_json(), @r#"
4599        [
4600          {
4601            "Plan": {
4602              "Node Type": "Projection",
4603              "Expressions": [
4604                "employee_csv.id"
4605              ],
4606              "Plans": [
4607                {
4608                  "Node Type": "Filter",
4609                  "Condition": "employee_csv.state IN (<subquery>)",
4610                  "Plans": [
4611                    {
4612                      "Node Type": "Subquery",
4613                      "Plans": [
4614                        {
4615                          "Node Type": "TableScan",
4616                          "Relation Name": "employee_csv",
4617                          "Plans": [],
4618                          "Output": [
4619                            "state"
4620                          ]
4621                        }
4622                      ],
4623                      "Output": [
4624                        "state"
4625                      ]
4626                    },
4627                    {
4628                      "Node Type": "TableScan",
4629                      "Relation Name": "employee_csv",
4630                      "Plans": [],
4631                      "Output": [
4632                        "id",
4633                        "state"
4634                      ]
4635                    }
4636                  ],
4637                  "Output": [
4638                    "id",
4639                    "state"
4640                  ]
4641                }
4642              ],
4643              "Output": [
4644                "id"
4645              ]
4646            }
4647          }
4648        ]
4649        "#);
4650        Ok(())
4651    }
4652
4653    /// Tests for the Visitor trait and walking logical plan nodes
4654    #[derive(Debug, Default)]
4655    struct OkVisitor {
4656        strings: Vec<String>,
4657    }
4658
4659    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4660        type Node = LogicalPlan;
4661
4662        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4663            let s = match plan {
4664                LogicalPlan::Projection { .. } => "pre_visit Projection",
4665                LogicalPlan::Filter { .. } => "pre_visit Filter",
4666                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4667                _ => {
4668                    return not_impl_err!("unknown plan type");
4669                }
4670            };
4671
4672            self.strings.push(s.into());
4673            Ok(TreeNodeRecursion::Continue)
4674        }
4675
4676        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4677            let s = match plan {
4678                LogicalPlan::Projection { .. } => "post_visit Projection",
4679                LogicalPlan::Filter { .. } => "post_visit Filter",
4680                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4681                _ => {
4682                    return not_impl_err!("unknown plan type");
4683                }
4684            };
4685
4686            self.strings.push(s.into());
4687            Ok(TreeNodeRecursion::Continue)
4688        }
4689    }
4690
4691    #[test]
4692    fn visit_order() {
4693        let mut visitor = OkVisitor::default();
4694        let plan = test_plan();
4695        let res = plan.visit_with_subqueries(&mut visitor);
4696        assert!(res.is_ok());
4697
4698        assert_debug_snapshot!(visitor.strings, @r#"
4699        [
4700            "pre_visit Projection",
4701            "pre_visit Filter",
4702            "pre_visit TableScan",
4703            "post_visit TableScan",
4704            "post_visit Filter",
4705            "post_visit Projection",
4706        ]
4707        "#);
4708    }
4709
4710    #[derive(Debug, Default)]
4711    /// Counter than counts to zero and returns true when it gets there
4712    struct OptionalCounter {
4713        val: Option<usize>,
4714    }
4715
4716    impl OptionalCounter {
4717        fn new(val: usize) -> Self {
4718            Self { val: Some(val) }
4719        }
4720        // Decrements the counter by 1, if any, returning true if it hits zero
4721        fn dec(&mut self) -> bool {
4722            if Some(0) == self.val {
4723                true
4724            } else {
4725                self.val = self.val.take().map(|i| i - 1);
4726                false
4727            }
4728        }
4729    }
4730
4731    #[derive(Debug, Default)]
4732    /// Visitor that returns false after some number of visits
4733    struct StoppingVisitor {
4734        inner: OkVisitor,
4735        /// When Some(0) returns false from pre_visit
4736        return_false_from_pre_in: OptionalCounter,
4737        /// When Some(0) returns false from post_visit
4738        return_false_from_post_in: OptionalCounter,
4739    }
4740
4741    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4742        type Node = LogicalPlan;
4743
4744        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4745            if self.return_false_from_pre_in.dec() {
4746                return Ok(TreeNodeRecursion::Stop);
4747            }
4748            self.inner.f_down(plan)?;
4749
4750            Ok(TreeNodeRecursion::Continue)
4751        }
4752
4753        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4754            if self.return_false_from_post_in.dec() {
4755                return Ok(TreeNodeRecursion::Stop);
4756            }
4757
4758            self.inner.f_up(plan)
4759        }
4760    }
4761
4762    /// test early stopping in pre-visit
4763    #[test]
4764    fn early_stopping_pre_visit() {
4765        let mut visitor = StoppingVisitor {
4766            return_false_from_pre_in: OptionalCounter::new(2),
4767            ..Default::default()
4768        };
4769        let plan = test_plan();
4770        let res = plan.visit_with_subqueries(&mut visitor);
4771        assert!(res.is_ok());
4772
4773        assert_debug_snapshot!(
4774            visitor.inner.strings,
4775            @r#"
4776        [
4777            "pre_visit Projection",
4778            "pre_visit Filter",
4779        ]
4780        "#
4781        );
4782    }
4783
4784    #[test]
4785    fn early_stopping_post_visit() {
4786        let mut visitor = StoppingVisitor {
4787            return_false_from_post_in: OptionalCounter::new(1),
4788            ..Default::default()
4789        };
4790        let plan = test_plan();
4791        let res = plan.visit_with_subqueries(&mut visitor);
4792        assert!(res.is_ok());
4793
4794        assert_debug_snapshot!(
4795            visitor.inner.strings,
4796            @r#"
4797        [
4798            "pre_visit Projection",
4799            "pre_visit Filter",
4800            "pre_visit TableScan",
4801            "post_visit TableScan",
4802        ]
4803        "#
4804        );
4805    }
4806
4807    #[derive(Debug, Default)]
4808    /// Visitor that returns an error after some number of visits
4809    struct ErrorVisitor {
4810        inner: OkVisitor,
4811        /// When Some(0) returns false from pre_visit
4812        return_error_from_pre_in: OptionalCounter,
4813        /// When Some(0) returns false from post_visit
4814        return_error_from_post_in: OptionalCounter,
4815    }
4816
4817    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4818        type Node = LogicalPlan;
4819
4820        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4821            if self.return_error_from_pre_in.dec() {
4822                return not_impl_err!("Error in pre_visit");
4823            }
4824
4825            self.inner.f_down(plan)
4826        }
4827
4828        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4829            if self.return_error_from_post_in.dec() {
4830                return not_impl_err!("Error in post_visit");
4831            }
4832
4833            self.inner.f_up(plan)
4834        }
4835    }
4836
4837    #[test]
4838    fn error_pre_visit() {
4839        let mut visitor = ErrorVisitor {
4840            return_error_from_pre_in: OptionalCounter::new(2),
4841            ..Default::default()
4842        };
4843        let plan = test_plan();
4844        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4845        assert_snapshot!(
4846            res.strip_backtrace(),
4847            @"This feature is not implemented: Error in pre_visit"
4848        );
4849        assert_debug_snapshot!(
4850            visitor.inner.strings,
4851            @r#"
4852        [
4853            "pre_visit Projection",
4854            "pre_visit Filter",
4855        ]
4856        "#
4857        );
4858    }
4859
4860    #[test]
4861    fn error_post_visit() {
4862        let mut visitor = ErrorVisitor {
4863            return_error_from_post_in: OptionalCounter::new(1),
4864            ..Default::default()
4865        };
4866        let plan = test_plan();
4867        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4868        assert_snapshot!(
4869            res.strip_backtrace(),
4870            @"This feature is not implemented: Error in post_visit"
4871        );
4872        assert_debug_snapshot!(
4873            visitor.inner.strings,
4874            @r#"
4875        [
4876            "pre_visit Projection",
4877            "pre_visit Filter",
4878            "pre_visit TableScan",
4879            "post_visit TableScan",
4880        ]
4881        "#
4882        );
4883    }
4884
4885    #[test]
4886    fn test_partial_eq_hash_and_partial_ord() {
4887        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4888            produce_one_row: true,
4889            schema: Arc::new(DFSchema::empty()),
4890        }));
4891
4892        let count_window_function = |schema| {
4893            Window::try_new_with_schema(
4894                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4895                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4896                    vec![],
4897                )))],
4898                Arc::clone(&empty_values),
4899                Arc::new(schema),
4900            )
4901            .unwrap()
4902        };
4903
4904        let schema_without_metadata = || {
4905            DFSchema::from_unqualified_fields(
4906                vec![Field::new("count", DataType::Int64, false)].into(),
4907                HashMap::new(),
4908            )
4909            .unwrap()
4910        };
4911
4912        let schema_with_metadata = || {
4913            DFSchema::from_unqualified_fields(
4914                vec![Field::new("count", DataType::Int64, false)].into(),
4915                [("key".to_string(), "value".to_string())].into(),
4916            )
4917            .unwrap()
4918        };
4919
4920        // A Window
4921        let f = count_window_function(schema_without_metadata());
4922
4923        // Same like `f`, different instance
4924        let f2 = count_window_function(schema_without_metadata());
4925        assert_eq!(f, f2);
4926        assert_eq!(hash(&f), hash(&f2));
4927        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4928
4929        // Same like `f`, except for schema metadata
4930        let o = count_window_function(schema_with_metadata());
4931        assert_ne!(f, o);
4932        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4933        assert_eq!(f.partial_cmp(&o), None);
4934    }
4935
4936    fn hash<T: Hash>(value: &T) -> u64 {
4937        let hasher = &mut DefaultHasher::new();
4938        value.hash(hasher);
4939        hasher.finish()
4940    }
4941
4942    #[test]
4943    fn projection_expr_schema_mismatch() -> Result<()> {
4944        let empty_schema = Arc::new(DFSchema::empty());
4945        let p = Projection::try_new_with_schema(
4946            vec![col("a")],
4947            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4948                produce_one_row: false,
4949                schema: Arc::clone(&empty_schema),
4950            })),
4951            empty_schema,
4952        );
4953        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4954        Ok(())
4955    }
4956
4957    fn test_plan() -> LogicalPlan {
4958        let schema = Schema::new(vec![
4959            Field::new("id", DataType::Int32, false),
4960            Field::new("state", DataType::Utf8, false),
4961        ]);
4962
4963        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4964            .unwrap()
4965            .filter(col("state").eq(lit("CO")))
4966            .unwrap()
4967            .project(vec![col("id")])
4968            .unwrap()
4969            .build()
4970            .unwrap()
4971    }
4972
4973    #[test]
4974    fn test_replace_invalid_placeholder() {
4975        // test empty placeholder
4976        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4977
4978        let plan = table_scan(TableReference::none(), &schema, None)
4979            .unwrap()
4980            .filter(col("id").eq(placeholder("")))
4981            .unwrap()
4982            .build()
4983            .unwrap();
4984
4985        let param_values = vec![ScalarValue::Int32(Some(42))];
4986        plan.replace_params_with_values(&param_values.clone().into())
4987            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4988
4989        // test $0 placeholder
4990        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4991
4992        let plan = table_scan(TableReference::none(), &schema, None)
4993            .unwrap()
4994            .filter(col("id").eq(placeholder("$0")))
4995            .unwrap()
4996            .build()
4997            .unwrap();
4998
4999        plan.replace_params_with_values(&param_values.clone().into())
5000            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
5001
5002        // test $00 placeholder
5003        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5004
5005        let plan = table_scan(TableReference::none(), &schema, None)
5006            .unwrap()
5007            .filter(col("id").eq(placeholder("$00")))
5008            .unwrap()
5009            .build()
5010            .unwrap();
5011
5012        plan.replace_params_with_values(&param_values.into())
5013            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
5014    }
5015
5016    #[test]
5017    fn test_replace_placeholder_mismatched_metadata() {
5018        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5019
5020        // Create a prepared statement with explicit fields that do not have metadata
5021        let plan = table_scan(TableReference::none(), &schema, None)
5022            .unwrap()
5023            .filter(col("id").eq(placeholder("$1")))
5024            .unwrap()
5025            .build()
5026            .unwrap();
5027        let prepared_builder = LogicalPlanBuilder::new(plan)
5028            .prepare(
5029                "".to_string(),
5030                vec![Field::new("", DataType::Int32, true).into()],
5031            )
5032            .unwrap();
5033
5034        // Attempt to bind a parameter with metadata
5035        let mut scalar_meta = HashMap::new();
5036        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
5037        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
5038            ScalarValue::Int32(Some(42)),
5039            Some(scalar_meta.into()),
5040        )]);
5041        prepared_builder
5042            .plan()
5043            .clone()
5044            .with_param_values(param_values)
5045            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
5046    }
5047
5048    #[test]
5049    fn test_replace_placeholder_empty_relation_valid_schema() {
5050        // SELECT $1, $2;
5051        let plan = LogicalPlanBuilder::empty(false)
5052            .project(vec![
5053                SelectExpr::from(placeholder("$1")),
5054                SelectExpr::from(placeholder("$2")),
5055            ])
5056            .unwrap()
5057            .build()
5058            .unwrap();
5059
5060        // original
5061        assert_snapshot!(plan.display_indent_schema(), @r"
5062        Projection: $1, $2 [$1:Null;N, $2:Null;N]
5063          EmptyRelation: rows=0 []
5064        ");
5065
5066        let plan = plan
5067            .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
5068            .unwrap();
5069
5070        // replaced
5071        assert_snapshot!(plan.display_indent_schema(), @r#"
5072        Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
5073          EmptyRelation: rows=0 []
5074        "#);
5075    }
5076
5077    #[test]
5078    fn test_nullable_schema_after_grouping_set() {
5079        let schema = Schema::new(vec![
5080            Field::new("foo", DataType::Int32, false),
5081            Field::new("bar", DataType::Int32, false),
5082        ]);
5083
5084        let plan = table_scan(TableReference::none(), &schema, None)
5085            .unwrap()
5086            .aggregate(
5087                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5088                    vec![col("foo")],
5089                    vec![col("bar")],
5090                ]))],
5091                vec![count(lit(true))],
5092            )
5093            .unwrap()
5094            .build()
5095            .unwrap();
5096
5097        let output_schema = plan.schema();
5098
5099        assert!(
5100            output_schema
5101                .field_with_name(None, "foo")
5102                .unwrap()
5103                .is_nullable(),
5104        );
5105        assert!(
5106            output_schema
5107                .field_with_name(None, "bar")
5108                .unwrap()
5109                .is_nullable()
5110        );
5111    }
5112
5113    #[test]
5114    fn grouping_id_type_accounts_for_duplicate_ordinal_bits() {
5115        // 8 grouping columns fit in UInt8 when there are no duplicate ordinals,
5116        // but adding one duplicate ordinal bit widens the type to UInt16.
5117        assert_eq!(Aggregate::grouping_id_type(8, 0), DataType::UInt8);
5118        assert_eq!(Aggregate::grouping_id_type(8, 1), DataType::UInt16);
5119    }
5120
5121    #[test]
5122    fn test_filter_is_scalar() {
5123        // test empty placeholder
5124        let schema =
5125            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5126
5127        let source = Arc::new(LogicalTableSource::new(schema));
5128        let schema = Arc::new(
5129            DFSchema::try_from_qualified_schema(
5130                TableReference::bare("tab"),
5131                &source.schema(),
5132            )
5133            .unwrap(),
5134        );
5135        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5136            table_name: TableReference::bare("tab"),
5137            source: Arc::clone(&source) as Arc<dyn TableSource>,
5138            projection: None,
5139            projected_schema: Arc::clone(&schema),
5140            filters: vec![],
5141            fetch: None,
5142        }));
5143        let col = schema.field_names()[0].clone();
5144
5145        let filter = Filter::try_new(
5146            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5147            scan,
5148        )
5149        .unwrap();
5150        assert!(!filter.is_scalar());
5151        let unique_schema = Arc::new(
5152            schema
5153                .as_ref()
5154                .clone()
5155                .with_functional_dependencies(
5156                    FunctionalDependencies::new_from_constraints(
5157                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
5158                            vec![0],
5159                        )])),
5160                        1,
5161                    ),
5162                )
5163                .unwrap(),
5164        );
5165        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5166            table_name: TableReference::bare("tab"),
5167            source,
5168            projection: None,
5169            projected_schema: Arc::clone(&unique_schema),
5170            filters: vec![],
5171            fetch: None,
5172        }));
5173        let col = schema.field_names()[0].clone();
5174
5175        let filter =
5176            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5177        assert!(filter.is_scalar());
5178    }
5179
5180    #[test]
5181    fn test_transform_explain() {
5182        let schema = Schema::new(vec![
5183            Field::new("foo", DataType::Int32, false),
5184            Field::new("bar", DataType::Int32, false),
5185        ]);
5186
5187        let plan = table_scan(TableReference::none(), &schema, None)
5188            .unwrap()
5189            .explain(false, false)
5190            .unwrap()
5191            .build()
5192            .unwrap();
5193
5194        let external_filter = col("foo").eq(lit(true));
5195
5196        // after transformation, because plan is not the same anymore,
5197        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
5198        let plan = plan
5199            .transform(|plan| match plan {
5200                LogicalPlan::TableScan(table) => {
5201                    let filter = Filter::try_new(
5202                        external_filter.clone(),
5203                        Arc::new(LogicalPlan::TableScan(table)),
5204                    )
5205                    .unwrap();
5206                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5207                }
5208                x => Ok(Transformed::no(x)),
5209            })
5210            .data()
5211            .unwrap();
5212
5213        let actual = format!("{}", plan.display_indent());
5214        assert_snapshot!(actual, @r"
5215        Explain
5216          Filter: foo = Boolean(true)
5217            TableScan: ?table?
5218        ")
5219    }
5220
5221    #[test]
5222    fn test_plan_partial_ord() {
5223        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5224            produce_one_row: false,
5225            schema: Arc::new(DFSchema::empty()),
5226        });
5227
5228        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5229            schema: Arc::new(Schema::new(vec![Field::new(
5230                "foo",
5231                DataType::Int32,
5232                false,
5233            )])),
5234            output_schema: DFSchemaRef::new(DFSchema::empty()),
5235        });
5236
5237        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5238            schema: Arc::new(Schema::new(vec![Field::new(
5239                "foo",
5240                DataType::Int32,
5241                false,
5242            )])),
5243            output_schema: DFSchemaRef::new(DFSchema::empty()),
5244        });
5245
5246        assert_eq!(
5247            empty_relation.partial_cmp(&describe_table),
5248            Some(Ordering::Less)
5249        );
5250        assert_eq!(
5251            describe_table.partial_cmp(&empty_relation),
5252            Some(Ordering::Greater)
5253        );
5254        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5255    }
5256
5257    #[test]
5258    fn test_limit_with_new_children() {
5259        let input = Arc::new(LogicalPlan::Values(Values {
5260            schema: Arc::new(DFSchema::empty()),
5261            values: vec![vec![]],
5262        }));
5263        let cases = [
5264            LogicalPlan::Limit(Limit {
5265                skip: None,
5266                fetch: None,
5267                input: Arc::clone(&input),
5268            }),
5269            LogicalPlan::Limit(Limit {
5270                skip: None,
5271                fetch: Some(Box::new(Expr::Literal(
5272                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5273                    None,
5274                ))),
5275                input: Arc::clone(&input),
5276            }),
5277            LogicalPlan::Limit(Limit {
5278                skip: Some(Box::new(Expr::Literal(
5279                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5280                    None,
5281                ))),
5282                fetch: None,
5283                input: Arc::clone(&input),
5284            }),
5285            LogicalPlan::Limit(Limit {
5286                skip: Some(Box::new(Expr::Literal(
5287                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5288                    None,
5289                ))),
5290                fetch: Some(Box::new(Expr::Literal(
5291                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5292                    None,
5293                ))),
5294                input,
5295            }),
5296        ];
5297
5298        for limit in cases {
5299            let new_limit = limit
5300                .with_new_exprs(
5301                    limit.expressions(),
5302                    limit.inputs().into_iter().cloned().collect(),
5303                )
5304                .unwrap();
5305            assert_eq!(limit, new_limit);
5306        }
5307    }
5308
5309    #[test]
5310    fn test_with_subqueries_jump() {
5311        // The test plan contains a `Project` node above a `Filter` node, and the
5312        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5313        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5314        // `Filter`s.
5315        let subquery_schema =
5316            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5317
5318        let subquery_plan =
5319            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5320                .unwrap()
5321                .filter(col("sub_id").eq(lit(0)))
5322                .unwrap()
5323                .build()
5324                .unwrap();
5325
5326        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5327
5328        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5329            .unwrap()
5330            .filter(col("id").eq(lit(0)))
5331            .unwrap()
5332            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5333            .unwrap()
5334            .build()
5335            .unwrap();
5336
5337        let mut filter_found = false;
5338        plan.apply_with_subqueries(|plan| {
5339            match plan {
5340                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5341                LogicalPlan::Filter(..) => filter_found = true,
5342                _ => {}
5343            }
5344            Ok(TreeNodeRecursion::Continue)
5345        })
5346        .unwrap();
5347        assert!(!filter_found);
5348
5349        struct ProjectJumpVisitor {
5350            filter_found: bool,
5351        }
5352
5353        impl ProjectJumpVisitor {
5354            fn new() -> Self {
5355                Self {
5356                    filter_found: false,
5357                }
5358            }
5359        }
5360
5361        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5362            type Node = LogicalPlan;
5363
5364            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5365                match node {
5366                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5367                    LogicalPlan::Filter(..) => self.filter_found = true,
5368                    _ => {}
5369                }
5370                Ok(TreeNodeRecursion::Continue)
5371            }
5372        }
5373
5374        let mut visitor = ProjectJumpVisitor::new();
5375        plan.visit_with_subqueries(&mut visitor).unwrap();
5376        assert!(!visitor.filter_found);
5377
5378        let mut filter_found = false;
5379        plan.clone()
5380            .transform_down_with_subqueries(|plan| {
5381                match plan {
5382                    LogicalPlan::Projection(..) => {
5383                        return Ok(Transformed::new(
5384                            plan,
5385                            false,
5386                            TreeNodeRecursion::Jump,
5387                        ));
5388                    }
5389                    LogicalPlan::Filter(..) => filter_found = true,
5390                    _ => {}
5391                }
5392                Ok(Transformed::no(plan))
5393            })
5394            .unwrap();
5395        assert!(!filter_found);
5396
5397        let mut filter_found = false;
5398        plan.clone()
5399            .transform_down_up_with_subqueries(
5400                |plan| {
5401                    match plan {
5402                        LogicalPlan::Projection(..) => {
5403                            return Ok(Transformed::new(
5404                                plan,
5405                                false,
5406                                TreeNodeRecursion::Jump,
5407                            ));
5408                        }
5409                        LogicalPlan::Filter(..) => filter_found = true,
5410                        _ => {}
5411                    }
5412                    Ok(Transformed::no(plan))
5413                },
5414                |plan| Ok(Transformed::no(plan)),
5415            )
5416            .unwrap();
5417        assert!(!filter_found);
5418
5419        struct ProjectJumpRewriter {
5420            filter_found: bool,
5421        }
5422
5423        impl ProjectJumpRewriter {
5424            fn new() -> Self {
5425                Self {
5426                    filter_found: false,
5427                }
5428            }
5429        }
5430
5431        impl TreeNodeRewriter for ProjectJumpRewriter {
5432            type Node = LogicalPlan;
5433
5434            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5435                match node {
5436                    LogicalPlan::Projection(..) => {
5437                        return Ok(Transformed::new(
5438                            node,
5439                            false,
5440                            TreeNodeRecursion::Jump,
5441                        ));
5442                    }
5443                    LogicalPlan::Filter(..) => self.filter_found = true,
5444                    _ => {}
5445                }
5446                Ok(Transformed::no(node))
5447            }
5448        }
5449
5450        let mut rewriter = ProjectJumpRewriter::new();
5451        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5452        assert!(!rewriter.filter_found);
5453    }
5454
5455    #[test]
5456    fn test_with_unresolved_placeholders() {
5457        let field_name = "id";
5458        let placeholder_value = "$1";
5459        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5460
5461        let plan = table_scan(TableReference::none(), &schema, None)
5462            .unwrap()
5463            .filter(col(field_name).eq(placeholder(placeholder_value)))
5464            .unwrap()
5465            .build()
5466            .unwrap();
5467
5468        // Check that the placeholder parameters have not received a DataType.
5469        let params = plan.get_parameter_fields().unwrap();
5470        assert_eq!(params.len(), 1);
5471
5472        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5473        assert_eq!(parameter_type, None);
5474    }
5475
5476    #[test]
5477    fn test_join_with_new_exprs() -> Result<()> {
5478        fn create_test_join(
5479            on: Vec<(Expr, Expr)>,
5480            filter: Option<Expr>,
5481        ) -> Result<LogicalPlan> {
5482            let schema = Schema::new(vec![
5483                Field::new("a", DataType::Int32, false),
5484                Field::new("b", DataType::Int32, false),
5485            ]);
5486
5487            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5488            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5489
5490            Ok(LogicalPlan::Join(Join {
5491                left: Arc::new(
5492                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5493                ),
5494                right: Arc::new(
5495                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5496                ),
5497                on,
5498                filter,
5499                join_type: JoinType::Inner,
5500                join_constraint: JoinConstraint::On,
5501                schema: Arc::new(left_schema.join(&right_schema)?),
5502                null_equality: NullEquality::NullEqualsNothing,
5503                null_aware: false,
5504            }))
5505        }
5506
5507        {
5508            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5509            let LogicalPlan::Join(join) = join.with_new_exprs(
5510                join.expressions(),
5511                join.inputs().into_iter().cloned().collect(),
5512            )?
5513            else {
5514                unreachable!()
5515            };
5516            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5517            assert_eq!(join.filter, None);
5518        }
5519
5520        {
5521            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5522            let LogicalPlan::Join(join) = join.with_new_exprs(
5523                join.expressions(),
5524                join.inputs().into_iter().cloned().collect(),
5525            )?
5526            else {
5527                unreachable!()
5528            };
5529            assert_eq!(join.on, vec![]);
5530            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5531        }
5532
5533        {
5534            let join = create_test_join(
5535                vec![(col("t1.a"), (col("t2.a")))],
5536                Some(col("t1.b").gt(col("t2.b"))),
5537            )?;
5538            let LogicalPlan::Join(join) = join.with_new_exprs(
5539                join.expressions(),
5540                join.inputs().into_iter().cloned().collect(),
5541            )?
5542            else {
5543                unreachable!()
5544            };
5545            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5546            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5547        }
5548
5549        {
5550            let join = create_test_join(
5551                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5552                None,
5553            )?;
5554            let LogicalPlan::Join(join) = join.with_new_exprs(
5555                vec![
5556                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5557                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5558                    col("t1.b"),
5559                    col("t2.b"),
5560                    lit(true),
5561                ],
5562                join.inputs().into_iter().cloned().collect(),
5563            )?
5564            else {
5565                unreachable!()
5566            };
5567            assert_eq!(
5568                join.on,
5569                vec![
5570                    (
5571                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5572                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5573                    ),
5574                    (col("t1.b"), (col("t2.b")))
5575                ]
5576            );
5577            assert_eq!(join.filter, Some(lit(true)));
5578        }
5579
5580        Ok(())
5581    }
5582
5583    #[test]
5584    fn test_join_try_new() -> Result<()> {
5585        let schema = Schema::new(vec![
5586            Field::new("a", DataType::Int32, false),
5587            Field::new("b", DataType::Int32, false),
5588        ]);
5589
5590        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5591
5592        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5593
5594        let join_types = vec![
5595            JoinType::Inner,
5596            JoinType::Left,
5597            JoinType::Right,
5598            JoinType::Full,
5599            JoinType::LeftSemi,
5600            JoinType::LeftAnti,
5601            JoinType::RightSemi,
5602            JoinType::RightAnti,
5603            JoinType::LeftMark,
5604        ];
5605
5606        for join_type in join_types {
5607            let join = Join::try_new(
5608                Arc::new(left_scan.clone()),
5609                Arc::new(right_scan.clone()),
5610                vec![(col("t1.a"), col("t2.a"))],
5611                Some(col("t1.b").gt(col("t2.b"))),
5612                join_type,
5613                JoinConstraint::On,
5614                NullEquality::NullEqualsNothing,
5615                false,
5616            )?;
5617
5618            match join_type {
5619                JoinType::LeftSemi | JoinType::LeftAnti => {
5620                    assert_eq!(join.schema.fields().len(), 2);
5621
5622                    let fields = join.schema.fields();
5623                    assert_eq!(
5624                        fields[0].name(),
5625                        "a",
5626                        "First field should be 'a' from left table"
5627                    );
5628                    assert_eq!(
5629                        fields[1].name(),
5630                        "b",
5631                        "Second field should be 'b' from left table"
5632                    );
5633                }
5634                JoinType::RightSemi | JoinType::RightAnti => {
5635                    assert_eq!(join.schema.fields().len(), 2);
5636
5637                    let fields = join.schema.fields();
5638                    assert_eq!(
5639                        fields[0].name(),
5640                        "a",
5641                        "First field should be 'a' from right table"
5642                    );
5643                    assert_eq!(
5644                        fields[1].name(),
5645                        "b",
5646                        "Second field should be 'b' from right table"
5647                    );
5648                }
5649                JoinType::LeftMark => {
5650                    assert_eq!(join.schema.fields().len(), 3);
5651
5652                    let fields = join.schema.fields();
5653                    assert_eq!(
5654                        fields[0].name(),
5655                        "a",
5656                        "First field should be 'a' from left table"
5657                    );
5658                    assert_eq!(
5659                        fields[1].name(),
5660                        "b",
5661                        "Second field should be 'b' from left table"
5662                    );
5663                    assert_eq!(
5664                        fields[2].name(),
5665                        "mark",
5666                        "Third field should be the mark column"
5667                    );
5668
5669                    assert!(!fields[0].is_nullable());
5670                    assert!(!fields[1].is_nullable());
5671                    assert!(!fields[2].is_nullable());
5672                }
5673                _ => {
5674                    assert_eq!(join.schema.fields().len(), 4);
5675
5676                    let fields = join.schema.fields();
5677                    assert_eq!(
5678                        fields[0].name(),
5679                        "a",
5680                        "First field should be 'a' from left table"
5681                    );
5682                    assert_eq!(
5683                        fields[1].name(),
5684                        "b",
5685                        "Second field should be 'b' from left table"
5686                    );
5687                    assert_eq!(
5688                        fields[2].name(),
5689                        "a",
5690                        "Third field should be 'a' from right table"
5691                    );
5692                    assert_eq!(
5693                        fields[3].name(),
5694                        "b",
5695                        "Fourth field should be 'b' from right table"
5696                    );
5697
5698                    if join_type == JoinType::Left {
5699                        // Left side fields (first two) shouldn't be nullable
5700                        assert!(!fields[0].is_nullable());
5701                        assert!(!fields[1].is_nullable());
5702                        // Right side fields (third and fourth) should be nullable
5703                        assert!(fields[2].is_nullable());
5704                        assert!(fields[3].is_nullable());
5705                    } else if join_type == JoinType::Right {
5706                        // Left side fields (first two) should be nullable
5707                        assert!(fields[0].is_nullable());
5708                        assert!(fields[1].is_nullable());
5709                        // Right side fields (third and fourth) shouldn't be nullable
5710                        assert!(!fields[2].is_nullable());
5711                        assert!(!fields[3].is_nullable());
5712                    } else if join_type == JoinType::Full {
5713                        assert!(fields[0].is_nullable());
5714                        assert!(fields[1].is_nullable());
5715                        assert!(fields[2].is_nullable());
5716                        assert!(fields[3].is_nullable());
5717                    }
5718                }
5719            }
5720
5721            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5722            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5723            assert_eq!(join.join_type, join_type);
5724            assert_eq!(join.join_constraint, JoinConstraint::On);
5725            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5726        }
5727
5728        Ok(())
5729    }
5730
5731    #[test]
5732    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5733        let left_schema = Schema::new(vec![
5734            Field::new("id", DataType::Int32, false), // Common column in both tables
5735            Field::new("name", DataType::Utf8, false), // Unique to left
5736            Field::new("value", DataType::Int32, false), // Common column, different meaning
5737        ]);
5738
5739        let right_schema = Schema::new(vec![
5740            Field::new("id", DataType::Int32, false), // Common column in both tables
5741            Field::new("category", DataType::Utf8, false), // Unique to right
5742            Field::new("value", DataType::Float64, true), // Common column, different meaning
5743        ]);
5744
5745        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5746
5747        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5748
5749        // Test 1: USING constraint with a common column
5750        {
5751            // In the logical plan, both copies of the `id` column are preserved
5752            // The USING constraint is handled later during physical execution, where the common column appears once
5753            let join = Join::try_new(
5754                Arc::new(left_plan.clone()),
5755                Arc::new(right_plan.clone()),
5756                vec![(col("t1.id"), col("t2.id"))],
5757                None,
5758                JoinType::Inner,
5759                JoinConstraint::Using,
5760                NullEquality::NullEqualsNothing,
5761                false,
5762            )?;
5763
5764            let fields = join.schema.fields();
5765
5766            assert_eq!(fields.len(), 6);
5767
5768            assert_eq!(
5769                fields[0].name(),
5770                "id",
5771                "First field should be 'id' from left table"
5772            );
5773            assert_eq!(
5774                fields[1].name(),
5775                "name",
5776                "Second field should be 'name' from left table"
5777            );
5778            assert_eq!(
5779                fields[2].name(),
5780                "value",
5781                "Third field should be 'value' from left table"
5782            );
5783            assert_eq!(
5784                fields[3].name(),
5785                "id",
5786                "Fourth field should be 'id' from right table"
5787            );
5788            assert_eq!(
5789                fields[4].name(),
5790                "category",
5791                "Fifth field should be 'category' from right table"
5792            );
5793            assert_eq!(
5794                fields[5].name(),
5795                "value",
5796                "Sixth field should be 'value' from right table"
5797            );
5798
5799            assert_eq!(join.join_constraint, JoinConstraint::Using);
5800        }
5801
5802        // Test 2: Complex join condition with expressions
5803        {
5804            // Complex condition: join on id equality AND where left.value < right.value
5805            let join = Join::try_new(
5806                Arc::new(left_plan.clone()),
5807                Arc::new(right_plan.clone()),
5808                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5809                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5810                JoinType::Inner,
5811                JoinConstraint::On,
5812                NullEquality::NullEqualsNothing,
5813                false,
5814            )?;
5815
5816            let fields = join.schema.fields();
5817            assert_eq!(fields.len(), 6);
5818
5819            assert_eq!(
5820                fields[0].name(),
5821                "id",
5822                "First field should be 'id' from left table"
5823            );
5824            assert_eq!(
5825                fields[1].name(),
5826                "name",
5827                "Second field should be 'name' from left table"
5828            );
5829            assert_eq!(
5830                fields[2].name(),
5831                "value",
5832                "Third field should be 'value' from left table"
5833            );
5834            assert_eq!(
5835                fields[3].name(),
5836                "id",
5837                "Fourth field should be 'id' from right table"
5838            );
5839            assert_eq!(
5840                fields[4].name(),
5841                "category",
5842                "Fifth field should be 'category' from right table"
5843            );
5844            assert_eq!(
5845                fields[5].name(),
5846                "value",
5847                "Sixth field should be 'value' from right table"
5848            );
5849
5850            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5851        }
5852
5853        // Test 3: Join with null equality behavior set to true
5854        {
5855            let join = Join::try_new(
5856                Arc::new(left_plan.clone()),
5857                Arc::new(right_plan.clone()),
5858                vec![(col("t1.id"), col("t2.id"))],
5859                None,
5860                JoinType::Inner,
5861                JoinConstraint::On,
5862                NullEquality::NullEqualsNull,
5863                false,
5864            )?;
5865
5866            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5867        }
5868
5869        Ok(())
5870    }
5871
5872    #[test]
5873    fn test_join_try_new_schema_validation() -> Result<()> {
5874        let left_schema = Schema::new(vec![
5875            Field::new("id", DataType::Int32, false),
5876            Field::new("name", DataType::Utf8, false),
5877            Field::new("value", DataType::Float64, true),
5878        ]);
5879
5880        let right_schema = Schema::new(vec![
5881            Field::new("id", DataType::Int32, false),
5882            Field::new("category", DataType::Utf8, true),
5883            Field::new("code", DataType::Int16, false),
5884        ]);
5885
5886        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5887
5888        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5889
5890        let join_types = vec![
5891            JoinType::Inner,
5892            JoinType::Left,
5893            JoinType::Right,
5894            JoinType::Full,
5895        ];
5896
5897        for join_type in join_types {
5898            let join = Join::try_new(
5899                Arc::new(left_plan.clone()),
5900                Arc::new(right_plan.clone()),
5901                vec![(col("t1.id"), col("t2.id"))],
5902                Some(col("t1.value").gt(lit(5.0))),
5903                join_type,
5904                JoinConstraint::On,
5905                NullEquality::NullEqualsNothing,
5906                false,
5907            )?;
5908
5909            let fields = join.schema.fields();
5910            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5911
5912            for (i, field) in fields.iter().enumerate() {
5913                let expected_nullable = match (i, &join_type) {
5914                    // Left table fields (indices 0, 1, 2)
5915                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5916                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5917                    (2, _) => true, // value is already nullable
5918
5919                    // Right table fields (indices 3, 4, 5)
5920                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5921                    (4, _) => true, // category is already nullable
5922                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5923
5924                    _ => false,
5925                };
5926
5927                assert_eq!(
5928                    field.is_nullable(),
5929                    expected_nullable,
5930                    "Field {} ({}) nullability incorrect for {:?} join",
5931                    i,
5932                    field.name(),
5933                    join_type
5934                );
5935            }
5936        }
5937
5938        let using_join = Join::try_new(
5939            Arc::new(left_plan.clone()),
5940            Arc::new(right_plan.clone()),
5941            vec![(col("t1.id"), col("t2.id"))],
5942            None,
5943            JoinType::Inner,
5944            JoinConstraint::Using,
5945            NullEquality::NullEqualsNothing,
5946            false,
5947        )?;
5948
5949        assert_eq!(
5950            using_join.schema.fields().len(),
5951            6,
5952            "USING join should have all fields"
5953        );
5954        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5955
5956        Ok(())
5957    }
5958}