Skip to main content

datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29    InvariantLevel, assert_always_invariants_at_current_node,
30    assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35    intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38    NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48    BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49    LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50    WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62    FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63    ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207    /// Evaluates an arbitrary list of expressions (essentially a
208    /// SELECT with an expression list) on its input.
209    Projection(Projection),
210    /// Filters rows from its input that do not match an
211    /// expression (essentially a WHERE clause with a predicate
212    /// expression).
213    ///
214    /// Semantically, `<predicate>` is evaluated for each row of the
215    /// input; If the value of `<predicate>` is true, the input row is
216    /// passed to the output. If the value of `<predicate>` is false
217    /// (or null), the row is discarded.
218    Filter(Filter),
219    /// Windows input based on a set of window spec and window
220    /// function (e.g. SUM or RANK).  This is used to implement SQL
221    /// window functions, and the `OVER` clause.
222    ///
223    /// See [`Window`] for more details
224    Window(Window),
225    /// Aggregates its input based on a set of grouping and aggregate
226    /// expressions (e.g. SUM). This is used to implement SQL aggregates
227    /// and `GROUP BY`.
228    ///
229    /// See [`Aggregate`] for more details
230    Aggregate(Aggregate),
231    /// Sorts its input according to a list of sort expressions. This
232    /// is used to implement SQL `ORDER BY`
233    Sort(Sort),
234    /// Join two logical plans on one or more join columns.
235    /// This is used to implement SQL `JOIN`
236    Join(Join),
237    /// Repartitions the input based on a partitioning scheme. This is
238    /// used to add parallelism and is sometimes referred to as an
239    /// "exchange" operator in other systems
240    Repartition(Repartition),
241    /// Union multiple inputs with the same schema into a single
242    /// output stream. This is used to implement SQL `UNION [ALL]` and
243    /// `INTERSECT [ALL]`.
244    Union(Union),
245    /// Produces rows from a [`TableSource`], used to implement SQL
246    /// `FROM` tables or views.
247    TableScan(TableScan),
248    /// Produces no rows: An empty relation with an empty schema that
249    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
250    /// that has no values in the `FROM` clause.
251    EmptyRelation(EmptyRelation),
252    /// Produces the output of running another query.  This is used to
253    /// implement SQL subqueries
254    Subquery(Subquery),
255    /// Aliased relation provides, or changes, the name of a relation.
256    SubqueryAlias(SubqueryAlias),
257    /// Skip some number of rows, and then fetch some number of rows.
258    Limit(Limit),
259    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
260    Statement(Statement),
261    /// Values expression. See
262    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
263    /// documentation for more details. This is used to implement SQL such as
264    /// `VALUES (1, 2), (3, 4)`
265    Values(Values),
266    /// Produces a relation with string representations of
267    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
268    Explain(Explain),
269    /// Runs the input, and prints annotated physical plan as a string
270    /// with execution metric. This is used to implement SQL
271    /// `EXPLAIN ANALYZE`.
272    Analyze(Analyze),
273    /// Extension operator defined outside of DataFusion. This is used
274    /// to extend DataFusion with custom relational operations that
275    Extension(Extension),
276    /// Remove duplicate rows from the input. This is used to
277    /// implement SQL `SELECT DISTINCT ...`.
278    Distinct(Distinct),
279    /// Data Manipulation Language (DML): Insert / Update / Delete
280    Dml(DmlStatement),
281    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
282    Ddl(DdlStatement),
283    /// `COPY TO` for writing plan results to files
284    Copy(CopyTo),
285    /// Describe the schema of the table. This is used to implement the
286    /// SQL `DESCRIBE` command from MySQL.
287    DescribeTable(DescribeTable),
288    /// Unnest a column that contains a nested list type such as an
289    /// ARRAY. This is used to implement SQL `UNNEST`
290    Unnest(Unnest),
291    /// A variadic query (e.g. "Recursive CTEs")
292    RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296    fn default() -> Self {
297        LogicalPlan::EmptyRelation(EmptyRelation {
298            produce_one_row: false,
299            schema: Arc::new(DFSchema::empty()),
300        })
301    }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306        &'a self,
307        mut f: F,
308    ) -> Result<TreeNodeRecursion> {
309        f(self)
310    }
311
312    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313        self,
314        mut f: F,
315    ) -> Result<Transformed<Self>> {
316        f(self)
317    }
318}
319
320impl LogicalPlan {
321    /// Get a reference to the logical plan's schema
322    pub fn schema(&self) -> &DFSchemaRef {
323        match self {
324            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325            LogicalPlan::Values(Values { schema, .. }) => schema,
326            LogicalPlan::TableScan(TableScan {
327                projected_schema, ..
328            }) => projected_schema,
329            LogicalPlan::Projection(Projection { schema, .. }) => schema,
330            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333            LogicalPlan::Window(Window { schema, .. }) => schema,
334            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336            LogicalPlan::Join(Join { schema, .. }) => schema,
337            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339            LogicalPlan::Statement(statement) => statement.schema(),
340            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342            LogicalPlan::Explain(explain) => &explain.schema,
343            LogicalPlan::Analyze(analyze) => &analyze.schema,
344            LogicalPlan::Extension(extension) => extension.node.schema(),
345            LogicalPlan::Union(Union { schema, .. }) => schema,
346            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347                output_schema
348            }
349            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351            LogicalPlan::Ddl(ddl) => ddl.schema(),
352            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354                // we take the schema of the static term as the schema of the entire recursive query
355                static_term.schema()
356            }
357        }
358    }
359
360    /// Used for normalizing columns, as the fallback schemas to the main schema
361    /// of the plan.
362    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363        match self {
364            LogicalPlan::Window(_)
365            | LogicalPlan::Projection(_)
366            | LogicalPlan::Aggregate(_)
367            | LogicalPlan::Unnest(_)
368            | LogicalPlan::Join(_) => self
369                .inputs()
370                .iter()
371                .map(|input| input.schema().as_ref())
372                .collect(),
373            _ => vec![],
374        }
375    }
376
377    /// Returns the (fixed) output schema for explain plans
378    pub fn explain_schema() -> SchemaRef {
379        SchemaRef::new(Schema::new(vec![
380            Field::new("plan_type", DataType::Utf8, false),
381            Field::new("plan", DataType::Utf8, false),
382        ]))
383    }
384
385    /// Returns the (fixed) output schema for `DESCRIBE` plans
386    pub fn describe_schema() -> Schema {
387        Schema::new(vec![
388            Field::new("column_name", DataType::Utf8, false),
389            Field::new("data_type", DataType::Utf8, false),
390            Field::new("is_nullable", DataType::Utf8, false),
391        ])
392    }
393
394    /// Returns all expressions (non-recursively) evaluated by the current
395    /// logical plan node. This does not include expressions in any children.
396    ///
397    /// Note this method `clone`s all the expressions. When possible, the
398    /// [`tree_node`] API should be used instead of this API.
399    ///
400    /// The returned expressions do not necessarily represent or even
401    /// contributed to the output schema of this node. For example,
402    /// `LogicalPlan::Filter` returns the filter expression even though the
403    /// output of a Filter has the same columns as the input.
404    ///
405    /// The expressions do contain all the columns that are used by this plan,
406    /// so if there are columns not referenced by these expressions then
407    /// DataFusion's optimizer attempts to optimize them away.
408    ///
409    /// [`tree_node`]: crate::logical_plan::tree_node
410    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411        let mut exprs = vec![];
412        self.apply_expressions(|e| {
413            exprs.push(e.clone());
414            Ok(TreeNodeRecursion::Continue)
415        })
416        // closure always returns OK
417        .unwrap();
418        exprs
419    }
420
421    /// Returns all the out reference(correlated) expressions (recursively) in the current
422    /// logical plan nodes and all its descendant nodes.
423    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424        let mut exprs = vec![];
425        self.apply_expressions(|e| {
426            find_out_reference_exprs(e).into_iter().for_each(|e| {
427                if !exprs.contains(&e) {
428                    exprs.push(e)
429                }
430            });
431            Ok(TreeNodeRecursion::Continue)
432        })
433        // closure always returns OK
434        .unwrap();
435        self.inputs()
436            .into_iter()
437            .flat_map(|child| child.all_out_ref_exprs())
438            .for_each(|e| {
439                if !exprs.contains(&e) {
440                    exprs.push(e)
441                }
442            });
443        exprs
444    }
445
446    /// Returns all inputs / children of this `LogicalPlan` node.
447    ///
448    /// Note does not include inputs to inputs, or subqueries.
449    pub fn inputs(&self) -> Vec<&LogicalPlan> {
450        match self {
451            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454            LogicalPlan::Window(Window { input, .. }) => vec![input],
455            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461            LogicalPlan::Extension(extension) => extension.node.inputs(),
462            LogicalPlan::Union(Union { inputs, .. }) => {
463                inputs.iter().map(|arc| arc.as_ref()).collect()
464            }
465            LogicalPlan::Distinct(
466                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467            ) => vec![input],
468            LogicalPlan::Explain(explain) => vec![&explain.plan],
469            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470            LogicalPlan::Dml(write) => vec![&write.input],
471            LogicalPlan::Copy(copy) => vec![&copy.input],
472            LogicalPlan::Ddl(ddl) => ddl.inputs(),
473            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474            LogicalPlan::RecursiveQuery(RecursiveQuery {
475                static_term,
476                recursive_term,
477                ..
478            }) => vec![static_term, recursive_term],
479            LogicalPlan::Statement(stmt) => stmt.inputs(),
480            // plans without inputs
481            LogicalPlan::TableScan { .. }
482            | LogicalPlan::EmptyRelation { .. }
483            | LogicalPlan::Values { .. }
484            | LogicalPlan::DescribeTable(_) => vec![],
485        }
486    }
487
488    /// returns all `Using` join columns in a logical plan
489    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490        let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492        self.apply_with_subqueries(|plan| {
493            if let LogicalPlan::Join(Join {
494                join_constraint: JoinConstraint::Using,
495                on,
496                ..
497            }) = plan
498            {
499                // The join keys in using-join must be columns.
500                let columns =
501                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502                        let Some(l) = l.get_as_join_column() else {
503                            return internal_err!(
504                                "Invalid join key. Expected column, found {l:?}"
505                            );
506                        };
507                        let Some(r) = r.get_as_join_column() else {
508                            return internal_err!(
509                                "Invalid join key. Expected column, found {r:?}"
510                            );
511                        };
512                        accumu.insert(l.to_owned());
513                        accumu.insert(r.to_owned());
514                        Result::<_, DataFusionError>::Ok(accumu)
515                    })?;
516                using_columns.push(columns);
517            }
518            Ok(TreeNodeRecursion::Continue)
519        })?;
520
521        Ok(using_columns)
522    }
523
524    /// returns the first output expression of this `LogicalPlan` node.
525    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526        match self {
527            LogicalPlan::Projection(projection) => {
528                Ok(Some(projection.expr.as_slice()[0].clone()))
529            }
530            LogicalPlan::Aggregate(agg) => {
531                if agg.group_expr.is_empty() {
532                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533                } else {
534                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
535                }
536            }
537            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538                Ok(Some(select_expr[0].clone()))
539            }
540            LogicalPlan::Filter(Filter { input, .. })
541            | LogicalPlan::Distinct(Distinct::All(input))
542            | LogicalPlan::Sort(Sort { input, .. })
543            | LogicalPlan::Limit(Limit { input, .. })
544            | LogicalPlan::Repartition(Repartition { input, .. })
545            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546            LogicalPlan::Join(Join {
547                left,
548                right,
549                join_type,
550                ..
551            }) => match join_type {
552                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553                    if left.schema().fields().is_empty() {
554                        right.head_output_expr()
555                    } else {
556                        left.head_output_expr()
557                    }
558                }
559                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560                    left.head_output_expr()
561                }
562                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563                    right.head_output_expr()
564                }
565            },
566            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567                static_term.head_output_expr()
568            }
569            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570                union.schema.qualified_field(0),
571            )))),
572            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573                table.projected_schema.qualified_field(0),
574            )))),
575            LogicalPlan::SubqueryAlias(subquery_alias) => {
576                let expr_opt = subquery_alias.input.head_output_expr()?;
577                expr_opt
578                    .map(|expr| {
579                        Ok(Expr::Column(create_col_from_scalar_expr(
580                            &expr,
581                            subquery_alias.alias.to_string(),
582                        )?))
583                    })
584                    .map_or(Ok(None), |v| v.map(Some))
585            }
586            LogicalPlan::Subquery(_) => Ok(None),
587            LogicalPlan::EmptyRelation(_)
588            | LogicalPlan::Statement(_)
589            | LogicalPlan::Values(_)
590            | LogicalPlan::Explain(_)
591            | LogicalPlan::Analyze(_)
592            | LogicalPlan::Extension(_)
593            | LogicalPlan::Dml(_)
594            | LogicalPlan::Copy(_)
595            | LogicalPlan::Ddl(_)
596            | LogicalPlan::DescribeTable(_)
597            | LogicalPlan::Unnest(_) => Ok(None),
598        }
599    }
600
601    /// Recomputes schema and type information for this LogicalPlan if needed.
602    ///
603    /// Some `LogicalPlan`s may need to recompute their schema if the number or
604    /// type of expressions have been changed (for example due to type
605    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
606    /// its expressions.
607    ///
608    /// Some `LogicalPlan`s schema is unaffected by any changes to their
609    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
610    /// same as its input schema.
611    ///
612    /// This is useful after modifying a plans `Expr`s (or input plans) via
613    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
614    /// [Self::with_new_exprs], this method does not require a new set of
615    /// expressions or inputs plans.
616    ///
617    /// # Return value
618    /// Returns an error if there is some issue recomputing the schema.
619    ///
620    /// # Notes
621    ///
622    /// * Does not recursively recompute schema for input (child) plans.
623    pub fn recompute_schema(self) -> Result<Self> {
624        match self {
625            // Since expr may be different than the previous expr, schema of the projection
626            // may change. We need to use try_new method instead of try_new_with_schema method.
627            LogicalPlan::Projection(Projection {
628                expr,
629                input,
630                schema: _,
631            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632            LogicalPlan::Dml(_) => Ok(self),
633            LogicalPlan::Copy(_) => Ok(self),
634            LogicalPlan::Values(Values { schema, values }) => {
635                // todo it isn't clear why the schema is not recomputed here
636                Ok(LogicalPlan::Values(Values { schema, values }))
637            }
638            LogicalPlan::Filter(Filter { predicate, input }) => {
639                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640            }
641            LogicalPlan::Repartition(_) => Ok(self),
642            LogicalPlan::Window(Window {
643                input,
644                window_expr,
645                schema: _,
646            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647            LogicalPlan::Aggregate(Aggregate {
648                input,
649                group_expr,
650                aggr_expr,
651                schema: _,
652            }) => Aggregate::try_new(input, group_expr, aggr_expr)
653                .map(LogicalPlan::Aggregate),
654            LogicalPlan::Sort(_) => Ok(self),
655            LogicalPlan::Join(Join {
656                left,
657                right,
658                filter,
659                join_type,
660                join_constraint,
661                on,
662                schema: _,
663                null_equality,
664            }) => {
665                let schema =
666                    build_join_schema(left.schema(), right.schema(), &join_type)?;
667
668                let new_on: Vec<_> = on
669                    .into_iter()
670                    .map(|equi_expr| {
671                        // SimplifyExpression rule may add alias to the equi_expr.
672                        (equi_expr.0.unalias(), equi_expr.1.unalias())
673                    })
674                    .collect();
675
676                Ok(LogicalPlan::Join(Join {
677                    left,
678                    right,
679                    join_type,
680                    join_constraint,
681                    on: new_on,
682                    filter,
683                    schema: DFSchemaRef::new(schema),
684                    null_equality,
685                }))
686            }
687            LogicalPlan::Subquery(_) => Ok(self),
688            LogicalPlan::SubqueryAlias(SubqueryAlias {
689                input,
690                alias,
691                schema: _,
692            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
693            LogicalPlan::Limit(_) => Ok(self),
694            LogicalPlan::Ddl(_) => Ok(self),
695            LogicalPlan::Extension(Extension { node }) => {
696                // todo make an API that does not require cloning
697                // This requires a copy of the extension nodes expressions and inputs
698                let expr = node.expressions();
699                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
700                Ok(LogicalPlan::Extension(Extension {
701                    node: node.with_exprs_and_inputs(expr, inputs)?,
702                }))
703            }
704            LogicalPlan::Union(Union { inputs, schema }) => {
705                let first_input_schema = inputs[0].schema();
706                if schema.fields().len() == first_input_schema.fields().len() {
707                    // If inputs are not pruned do not change schema
708                    Ok(LogicalPlan::Union(Union { inputs, schema }))
709                } else {
710                    // A note on `Union`s constructed via `try_new_by_name`:
711                    //
712                    // At this point, the schema for each input should have
713                    // the same width. Thus, we do not need to save whether a
714                    // `Union` was created `BY NAME`, and can safely rely on the
715                    // `try_new` initializer to derive the new schema based on
716                    // column positions.
717                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
718                }
719            }
720            LogicalPlan::Distinct(distinct) => {
721                let distinct = match distinct {
722                    Distinct::All(input) => Distinct::All(input),
723                    Distinct::On(DistinctOn {
724                        on_expr,
725                        select_expr,
726                        sort_expr,
727                        input,
728                        schema: _,
729                    }) => Distinct::On(DistinctOn::try_new(
730                        on_expr,
731                        select_expr,
732                        sort_expr,
733                        input,
734                    )?),
735                };
736                Ok(LogicalPlan::Distinct(distinct))
737            }
738            LogicalPlan::RecursiveQuery(_) => Ok(self),
739            LogicalPlan::Analyze(_) => Ok(self),
740            LogicalPlan::Explain(_) => Ok(self),
741            LogicalPlan::TableScan(_) => Ok(self),
742            LogicalPlan::EmptyRelation(_) => Ok(self),
743            LogicalPlan::Statement(_) => Ok(self),
744            LogicalPlan::DescribeTable(_) => Ok(self),
745            LogicalPlan::Unnest(Unnest {
746                input,
747                exec_columns,
748                options,
749                ..
750            }) => {
751                // Update schema with unnested column type.
752                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
753            }
754        }
755    }
756
757    /// Returns a new `LogicalPlan` based on `self` with inputs and
758    /// expressions replaced.
759    ///
760    /// Note this method creates an entirely new node, which requires a large
761    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
762    /// instead of this API.
763    ///
764    /// The exprs correspond to the same order of expressions returned
765    /// by [`Self::expressions`]. This function is used by optimizers
766    /// to rewrite plans using the following pattern:
767    ///
768    /// [`tree_node`]: crate::logical_plan::tree_node
769    ///
770    /// ```text
771    /// let new_inputs = optimize_children(..., plan, props);
772    ///
773    /// // get the plans expressions to optimize
774    /// let exprs = plan.expressions();
775    ///
776    /// // potentially rewrite plan expressions
777    /// let rewritten_exprs = rewrite_exprs(exprs);
778    ///
779    /// // create new plan using rewritten_exprs in same position
780    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
781    /// ```
782    pub fn with_new_exprs(
783        &self,
784        mut expr: Vec<Expr>,
785        inputs: Vec<LogicalPlan>,
786    ) -> Result<LogicalPlan> {
787        match self {
788            // Since expr may be different than the previous expr, schema of the projection
789            // may change. We need to use try_new method instead of try_new_with_schema method.
790            LogicalPlan::Projection(Projection { .. }) => {
791                let input = self.only_input(inputs)?;
792                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
793            }
794            LogicalPlan::Dml(DmlStatement {
795                table_name,
796                target,
797                op,
798                ..
799            }) => {
800                self.assert_no_expressions(expr)?;
801                let input = self.only_input(inputs)?;
802                Ok(LogicalPlan::Dml(DmlStatement::new(
803                    table_name.clone(),
804                    Arc::clone(target),
805                    op.clone(),
806                    Arc::new(input),
807                )))
808            }
809            LogicalPlan::Copy(CopyTo {
810                input: _,
811                output_url,
812                file_type,
813                options,
814                partition_by,
815                output_schema: _,
816            }) => {
817                self.assert_no_expressions(expr)?;
818                let input = self.only_input(inputs)?;
819                Ok(LogicalPlan::Copy(CopyTo::new(
820                    Arc::new(input),
821                    output_url.clone(),
822                    partition_by.clone(),
823                    Arc::clone(file_type),
824                    options.clone(),
825                )))
826            }
827            LogicalPlan::Values(Values { schema, .. }) => {
828                self.assert_no_inputs(inputs)?;
829                Ok(LogicalPlan::Values(Values {
830                    schema: Arc::clone(schema),
831                    values: expr
832                        .chunks_exact(schema.fields().len())
833                        .map(|s| s.to_vec())
834                        .collect(),
835                }))
836            }
837            LogicalPlan::Filter { .. } => {
838                let predicate = self.only_expr(expr)?;
839                let input = self.only_input(inputs)?;
840
841                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
842            }
843            LogicalPlan::Repartition(Repartition {
844                partitioning_scheme,
845                ..
846            }) => match partitioning_scheme {
847                Partitioning::RoundRobinBatch(n) => {
848                    self.assert_no_expressions(expr)?;
849                    let input = self.only_input(inputs)?;
850                    Ok(LogicalPlan::Repartition(Repartition {
851                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
852                        input: Arc::new(input),
853                    }))
854                }
855                Partitioning::Hash(_, n) => {
856                    let input = self.only_input(inputs)?;
857                    Ok(LogicalPlan::Repartition(Repartition {
858                        partitioning_scheme: Partitioning::Hash(expr, *n),
859                        input: Arc::new(input),
860                    }))
861                }
862                Partitioning::DistributeBy(_) => {
863                    let input = self.only_input(inputs)?;
864                    Ok(LogicalPlan::Repartition(Repartition {
865                        partitioning_scheme: Partitioning::DistributeBy(expr),
866                        input: Arc::new(input),
867                    }))
868                }
869            },
870            LogicalPlan::Window(Window { window_expr, .. }) => {
871                assert_eq!(window_expr.len(), expr.len());
872                let input = self.only_input(inputs)?;
873                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
874            }
875            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
876                let input = self.only_input(inputs)?;
877                // group exprs are the first expressions
878                let agg_expr = expr.split_off(group_expr.len());
879
880                Aggregate::try_new(Arc::new(input), expr, agg_expr)
881                    .map(LogicalPlan::Aggregate)
882            }
883            LogicalPlan::Sort(Sort {
884                expr: sort_expr,
885                fetch,
886                ..
887            }) => {
888                let input = self.only_input(inputs)?;
889                Ok(LogicalPlan::Sort(Sort {
890                    expr: expr
891                        .into_iter()
892                        .zip(sort_expr.iter())
893                        .map(|(expr, sort)| sort.with_expr(expr))
894                        .collect(),
895                    input: Arc::new(input),
896                    fetch: *fetch,
897                }))
898            }
899            LogicalPlan::Join(Join {
900                join_type,
901                join_constraint,
902                on,
903                null_equality,
904                ..
905            }) => {
906                let (left, right) = self.only_two_inputs(inputs)?;
907                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
908
909                let equi_expr_count = on.len() * 2;
910                assert!(expr.len() >= equi_expr_count);
911
912                // Assume that the last expr, if any,
913                // is the filter_expr (non equality predicate from ON clause)
914                let filter_expr = if expr.len() > equi_expr_count {
915                    expr.pop()
916                } else {
917                    None
918                };
919
920                // The first part of expr is equi-exprs,
921                // and the struct of each equi-expr is like `left-expr = right-expr`.
922                assert_eq!(expr.len(), equi_expr_count);
923                let mut new_on = Vec::with_capacity(on.len());
924                let mut iter = expr.into_iter();
925                while let Some(left) = iter.next() {
926                    let Some(right) = iter.next() else {
927                        internal_err!(
928                            "Expected a pair of expressions to construct the join on expression"
929                        )?
930                    };
931
932                    // SimplifyExpression rule may add alias to the equi_expr.
933                    new_on.push((left.unalias(), right.unalias()));
934                }
935
936                Ok(LogicalPlan::Join(Join {
937                    left: Arc::new(left),
938                    right: Arc::new(right),
939                    join_type: *join_type,
940                    join_constraint: *join_constraint,
941                    on: new_on,
942                    filter: filter_expr,
943                    schema: DFSchemaRef::new(schema),
944                    null_equality: *null_equality,
945                }))
946            }
947            LogicalPlan::Subquery(Subquery {
948                outer_ref_columns,
949                spans,
950                ..
951            }) => {
952                self.assert_no_expressions(expr)?;
953                let input = self.only_input(inputs)?;
954                let subquery = LogicalPlanBuilder::from(input).build()?;
955                Ok(LogicalPlan::Subquery(Subquery {
956                    subquery: Arc::new(subquery),
957                    outer_ref_columns: outer_ref_columns.clone(),
958                    spans: spans.clone(),
959                }))
960            }
961            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
962                self.assert_no_expressions(expr)?;
963                let input = self.only_input(inputs)?;
964                SubqueryAlias::try_new(Arc::new(input), alias.clone())
965                    .map(LogicalPlan::SubqueryAlias)
966            }
967            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
968                let old_expr_len = skip.iter().chain(fetch.iter()).count();
969                assert_eq_or_internal_err!(
970                    old_expr_len,
971                    expr.len(),
972                    "Invalid number of new Limit expressions: expected {}, got {}",
973                    old_expr_len,
974                    expr.len()
975                );
976                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
977                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
978                let new_skip = skip.as_ref().and_then(|_| expr.pop());
979                let input = self.only_input(inputs)?;
980                Ok(LogicalPlan::Limit(Limit {
981                    skip: new_skip.map(Box::new),
982                    fetch: new_fetch.map(Box::new),
983                    input: Arc::new(input),
984                }))
985            }
986            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
987                name,
988                if_not_exists,
989                or_replace,
990                column_defaults,
991                temporary,
992                ..
993            })) => {
994                self.assert_no_expressions(expr)?;
995                let input = self.only_input(inputs)?;
996                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
997                    CreateMemoryTable {
998                        input: Arc::new(input),
999                        constraints: Constraints::default(),
1000                        name: name.clone(),
1001                        if_not_exists: *if_not_exists,
1002                        or_replace: *or_replace,
1003                        column_defaults: column_defaults.clone(),
1004                        temporary: *temporary,
1005                    },
1006                )))
1007            }
1008            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1009                name,
1010                or_replace,
1011                definition,
1012                temporary,
1013                ..
1014            })) => {
1015                self.assert_no_expressions(expr)?;
1016                let input = self.only_input(inputs)?;
1017                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1018                    input: Arc::new(input),
1019                    name: name.clone(),
1020                    or_replace: *or_replace,
1021                    temporary: *temporary,
1022                    definition: definition.clone(),
1023                })))
1024            }
1025            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1026                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1027            })),
1028            LogicalPlan::Union(Union { schema, .. }) => {
1029                self.assert_no_expressions(expr)?;
1030                let input_schema = inputs[0].schema();
1031                // If inputs are not pruned do not change schema.
1032                let schema = if schema.fields().len() == input_schema.fields().len() {
1033                    Arc::clone(schema)
1034                } else {
1035                    Arc::clone(input_schema)
1036                };
1037                Ok(LogicalPlan::Union(Union {
1038                    inputs: inputs.into_iter().map(Arc::new).collect(),
1039                    schema,
1040                }))
1041            }
1042            LogicalPlan::Distinct(distinct) => {
1043                let distinct = match distinct {
1044                    Distinct::All(_) => {
1045                        self.assert_no_expressions(expr)?;
1046                        let input = self.only_input(inputs)?;
1047                        Distinct::All(Arc::new(input))
1048                    }
1049                    Distinct::On(DistinctOn {
1050                        on_expr,
1051                        select_expr,
1052                        ..
1053                    }) => {
1054                        let input = self.only_input(inputs)?;
1055                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1056                        let select_expr = expr.split_off(on_expr.len());
1057                        assert!(
1058                            sort_expr.is_empty(),
1059                            "with_new_exprs for Distinct does not support sort expressions"
1060                        );
1061                        Distinct::On(DistinctOn::try_new(
1062                            expr,
1063                            select_expr,
1064                            None, // no sort expressions accepted
1065                            Arc::new(input),
1066                        )?)
1067                    }
1068                };
1069                Ok(LogicalPlan::Distinct(distinct))
1070            }
1071            LogicalPlan::RecursiveQuery(RecursiveQuery {
1072                name, is_distinct, ..
1073            }) => {
1074                self.assert_no_expressions(expr)?;
1075                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1076                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1077                    name: name.clone(),
1078                    static_term: Arc::new(static_term),
1079                    recursive_term: Arc::new(recursive_term),
1080                    is_distinct: *is_distinct,
1081                }))
1082            }
1083            LogicalPlan::Analyze(a) => {
1084                self.assert_no_expressions(expr)?;
1085                let input = self.only_input(inputs)?;
1086                Ok(LogicalPlan::Analyze(Analyze {
1087                    verbose: a.verbose,
1088                    schema: Arc::clone(&a.schema),
1089                    input: Arc::new(input),
1090                }))
1091            }
1092            LogicalPlan::Explain(e) => {
1093                self.assert_no_expressions(expr)?;
1094                let input = self.only_input(inputs)?;
1095                Ok(LogicalPlan::Explain(Explain {
1096                    verbose: e.verbose,
1097                    plan: Arc::new(input),
1098                    explain_format: e.explain_format.clone(),
1099                    stringified_plans: e.stringified_plans.clone(),
1100                    schema: Arc::clone(&e.schema),
1101                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1102                }))
1103            }
1104            LogicalPlan::Statement(Statement::Prepare(Prepare {
1105                name, fields, ..
1106            })) => {
1107                self.assert_no_expressions(expr)?;
1108                let input = self.only_input(inputs)?;
1109                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1110                    name: name.clone(),
1111                    fields: fields.clone(),
1112                    input: Arc::new(input),
1113                })))
1114            }
1115            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1116                self.assert_no_inputs(inputs)?;
1117                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1118                    name: name.clone(),
1119                    parameters: expr,
1120                })))
1121            }
1122            LogicalPlan::TableScan(ts) => {
1123                self.assert_no_inputs(inputs)?;
1124                Ok(LogicalPlan::TableScan(TableScan {
1125                    filters: expr,
1126                    ..ts.clone()
1127                }))
1128            }
1129            LogicalPlan::EmptyRelation(_)
1130            | LogicalPlan::Ddl(_)
1131            | LogicalPlan::Statement(_)
1132            | LogicalPlan::DescribeTable(_) => {
1133                // All of these plan types have no inputs / exprs so should not be called
1134                self.assert_no_expressions(expr)?;
1135                self.assert_no_inputs(inputs)?;
1136                Ok(self.clone())
1137            }
1138            LogicalPlan::Unnest(Unnest {
1139                exec_columns: columns,
1140                options,
1141                ..
1142            }) => {
1143                self.assert_no_expressions(expr)?;
1144                let input = self.only_input(inputs)?;
1145                // Update schema with unnested column type.
1146                let new_plan =
1147                    unnest_with_options(input, columns.clone(), options.clone())?;
1148                Ok(new_plan)
1149            }
1150        }
1151    }
1152
1153    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1154    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1155        match check {
1156            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1157            InvariantLevel::Executable => assert_executable_invariants(self),
1158        }
1159    }
1160
1161    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1162    #[inline]
1163    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1164    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1165        assert_or_internal_err!(
1166            expr.is_empty(),
1167            "{self:?} should have no exprs, got {:?}",
1168            expr
1169        );
1170        Ok(())
1171    }
1172
1173    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1174    #[inline]
1175    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1176    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1177        assert_or_internal_err!(
1178            inputs.is_empty(),
1179            "{self:?} should have no inputs, got: {:?}",
1180            inputs
1181        );
1182        Ok(())
1183    }
1184
1185    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1186    #[inline]
1187    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1188        assert_eq_or_internal_err!(
1189            expr.len(),
1190            1,
1191            "{self:?} should have exactly one expr, got {:?}",
1192            &expr
1193        );
1194        Ok(expr.remove(0))
1195    }
1196
1197    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1198    #[inline]
1199    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1200        assert_eq_or_internal_err!(
1201            inputs.len(),
1202            1,
1203            "{self:?} should have exactly one input, got {:?}",
1204            &inputs
1205        );
1206        Ok(inputs.remove(0))
1207    }
1208
1209    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1210    #[inline]
1211    fn only_two_inputs(
1212        &self,
1213        mut inputs: Vec<LogicalPlan>,
1214    ) -> Result<(LogicalPlan, LogicalPlan)> {
1215        assert_eq_or_internal_err!(
1216            inputs.len(),
1217            2,
1218            "{self:?} should have exactly two inputs, got {:?}",
1219            &inputs
1220        );
1221        let right = inputs.remove(1);
1222        let left = inputs.remove(0);
1223        Ok((left, right))
1224    }
1225
1226    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1227    /// with the specified `param_values`.
1228    ///
1229    /// [`Prepare`] statements are converted to
1230    /// their inner logical plan for execution.
1231    ///
1232    /// # Example
1233    /// ```
1234    /// # use arrow::datatypes::{Field, Schema, DataType};
1235    /// use datafusion_common::ScalarValue;
1236    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1237    /// # let schema = Schema::new(vec![
1238    /// #     Field::new("id", DataType::Int32, false),
1239    /// # ]);
1240    /// // Build SELECT * FROM t1 WHERE id = $1
1241    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1242    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1243    ///     .build().unwrap();
1244    ///
1245    /// assert_eq!(
1246    ///   "Filter: t1.id = $1\
1247    ///   \n  TableScan: t1",
1248    ///   plan.display_indent().to_string()
1249    /// );
1250    ///
1251    /// // Fill in the parameter $1 with a literal 3
1252    /// let plan = plan.with_param_values(vec![
1253    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1254    /// ]).unwrap();
1255    ///
1256    /// assert_eq!(
1257    ///    "Filter: t1.id = Int32(3)\
1258    ///    \n  TableScan: t1",
1259    ///    plan.display_indent().to_string()
1260    ///  );
1261    ///
1262    /// // Note you can also used named parameters
1263    /// // Build SELECT * FROM t1 WHERE id = $my_param
1264    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1265    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1266    ///     .build().unwrap()
1267    ///     // Fill in the parameter $my_param with a literal 3
1268    ///     .with_param_values(vec![
1269    ///       ("my_param", ScalarValue::from(3i32)),
1270    ///     ]).unwrap();
1271    ///
1272    /// assert_eq!(
1273    ///    "Filter: t1.id = Int32(3)\
1274    ///    \n  TableScan: t1",
1275    ///    plan.display_indent().to_string()
1276    ///  );
1277    /// ```
1278    pub fn with_param_values(
1279        self,
1280        param_values: impl Into<ParamValues>,
1281    ) -> Result<LogicalPlan> {
1282        let param_values = param_values.into();
1283        let plan_with_values = self.replace_params_with_values(&param_values)?;
1284
1285        // unwrap Prepare
1286        Ok(
1287            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1288                plan_with_values
1289            {
1290                param_values.verify_fields(&prepare_lp.fields)?;
1291                // try and take ownership of the input if is not shared, clone otherwise
1292                Arc::unwrap_or_clone(prepare_lp.input)
1293            } else {
1294                plan_with_values
1295            },
1296        )
1297    }
1298
1299    /// Returns the maximum number of rows that this plan can output, if known.
1300    ///
1301    /// If `None`, the plan can return any number of rows.
1302    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1303    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1304        match self {
1305            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1306            LogicalPlan::Filter(filter) => {
1307                if filter.is_scalar() {
1308                    Some(1)
1309                } else {
1310                    filter.input.max_rows()
1311                }
1312            }
1313            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1314            LogicalPlan::Aggregate(Aggregate {
1315                input, group_expr, ..
1316            }) => {
1317                // Empty group_expr will return Some(1)
1318                if group_expr
1319                    .iter()
1320                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1321                {
1322                    Some(1)
1323                } else {
1324                    input.max_rows()
1325                }
1326            }
1327            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1328                match (fetch, input.max_rows()) {
1329                    (Some(fetch_limit), Some(input_max)) => {
1330                        Some(input_max.min(*fetch_limit))
1331                    }
1332                    (Some(fetch_limit), None) => Some(*fetch_limit),
1333                    (None, Some(input_max)) => Some(input_max),
1334                    (None, None) => None,
1335                }
1336            }
1337            LogicalPlan::Join(Join {
1338                left,
1339                right,
1340                join_type,
1341                ..
1342            }) => match join_type {
1343                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1344                JoinType::Left | JoinType::Right | JoinType::Full => {
1345                    match (left.max_rows()?, right.max_rows()?, join_type) {
1346                        (0, 0, _) => Some(0),
1347                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1348                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1349                        (left_max, right_max, _) => Some(left_max * right_max),
1350                    }
1351                }
1352                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1353                    left.max_rows()
1354                }
1355                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1356                    right.max_rows()
1357                }
1358            },
1359            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1360            LogicalPlan::Union(Union { inputs, .. }) => {
1361                inputs.iter().try_fold(0usize, |mut acc, plan| {
1362                    acc += plan.max_rows()?;
1363                    Some(acc)
1364                })
1365            }
1366            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1367            LogicalPlan::EmptyRelation(_) => Some(0),
1368            LogicalPlan::RecursiveQuery(_) => None,
1369            LogicalPlan::Subquery(_) => None,
1370            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1371            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1372                Ok(FetchType::Literal(s)) => s,
1373                _ => None,
1374            },
1375            LogicalPlan::Distinct(
1376                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1377            ) => input.max_rows(),
1378            LogicalPlan::Values(v) => Some(v.values.len()),
1379            LogicalPlan::Unnest(_) => None,
1380            LogicalPlan::Ddl(_)
1381            | LogicalPlan::Explain(_)
1382            | LogicalPlan::Analyze(_)
1383            | LogicalPlan::Dml(_)
1384            | LogicalPlan::Copy(_)
1385            | LogicalPlan::DescribeTable(_)
1386            | LogicalPlan::Statement(_)
1387            | LogicalPlan::Extension(_) => None,
1388        }
1389    }
1390
1391    /// Returns the skip (offset) of this plan node, if it has one.
1392    ///
1393    /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1394    /// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1395    pub fn skip(&self) -> Result<Option<usize>> {
1396        match self {
1397            LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1398                SkipType::Literal(0) => Ok(None),
1399                SkipType::Literal(n) => Ok(Some(n)),
1400                SkipType::UnsupportedExpr => Ok(None),
1401            },
1402            LogicalPlan::Sort(_) => Ok(None),
1403            LogicalPlan::TableScan(_) => Ok(None),
1404            LogicalPlan::Projection(_) => Ok(None),
1405            LogicalPlan::Filter(_) => Ok(None),
1406            LogicalPlan::Window(_) => Ok(None),
1407            LogicalPlan::Aggregate(_) => Ok(None),
1408            LogicalPlan::Join(_) => Ok(None),
1409            LogicalPlan::Repartition(_) => Ok(None),
1410            LogicalPlan::Union(_) => Ok(None),
1411            LogicalPlan::EmptyRelation(_) => Ok(None),
1412            LogicalPlan::Subquery(_) => Ok(None),
1413            LogicalPlan::SubqueryAlias(_) => Ok(None),
1414            LogicalPlan::Statement(_) => Ok(None),
1415            LogicalPlan::Values(_) => Ok(None),
1416            LogicalPlan::Explain(_) => Ok(None),
1417            LogicalPlan::Analyze(_) => Ok(None),
1418            LogicalPlan::Extension(_) => Ok(None),
1419            LogicalPlan::Distinct(_) => Ok(None),
1420            LogicalPlan::Dml(_) => Ok(None),
1421            LogicalPlan::Ddl(_) => Ok(None),
1422            LogicalPlan::Copy(_) => Ok(None),
1423            LogicalPlan::DescribeTable(_) => Ok(None),
1424            LogicalPlan::Unnest(_) => Ok(None),
1425            LogicalPlan::RecursiveQuery(_) => Ok(None),
1426        }
1427    }
1428
1429    /// Returns the fetch (limit) of this plan node, if it has one.
1430    ///
1431    /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1432    /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1433    /// return `Ok(None)`.
1434    pub fn fetch(&self) -> Result<Option<usize>> {
1435        match self {
1436            LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1437            LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1438            LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1439                FetchType::Literal(s) => Ok(s),
1440                FetchType::UnsupportedExpr => Ok(None),
1441            },
1442            LogicalPlan::Projection(_) => Ok(None),
1443            LogicalPlan::Filter(_) => Ok(None),
1444            LogicalPlan::Window(_) => Ok(None),
1445            LogicalPlan::Aggregate(_) => Ok(None),
1446            LogicalPlan::Join(_) => Ok(None),
1447            LogicalPlan::Repartition(_) => Ok(None),
1448            LogicalPlan::Union(_) => Ok(None),
1449            LogicalPlan::EmptyRelation(_) => Ok(None),
1450            LogicalPlan::Subquery(_) => Ok(None),
1451            LogicalPlan::SubqueryAlias(_) => Ok(None),
1452            LogicalPlan::Statement(_) => Ok(None),
1453            LogicalPlan::Values(_) => Ok(None),
1454            LogicalPlan::Explain(_) => Ok(None),
1455            LogicalPlan::Analyze(_) => Ok(None),
1456            LogicalPlan::Extension(_) => Ok(None),
1457            LogicalPlan::Distinct(_) => Ok(None),
1458            LogicalPlan::Dml(_) => Ok(None),
1459            LogicalPlan::Ddl(_) => Ok(None),
1460            LogicalPlan::Copy(_) => Ok(None),
1461            LogicalPlan::DescribeTable(_) => Ok(None),
1462            LogicalPlan::Unnest(_) => Ok(None),
1463            LogicalPlan::RecursiveQuery(_) => Ok(None),
1464        }
1465    }
1466
1467    /// If this node's expressions contains any references to an outer subquery
1468    pub fn contains_outer_reference(&self) -> bool {
1469        let mut contains = false;
1470        self.apply_expressions(|expr| {
1471            Ok(if expr.contains_outer() {
1472                contains = true;
1473                TreeNodeRecursion::Stop
1474            } else {
1475                TreeNodeRecursion::Continue
1476            })
1477        })
1478        .unwrap();
1479        contains
1480    }
1481
1482    /// Get the output expressions and their corresponding columns.
1483    ///
1484    /// The parent node may reference the output columns of the plan by expressions, such as
1485    /// projection over aggregate or window functions. This method helps to convert the
1486    /// referenced expressions into columns.
1487    ///
1488    /// See also: [`crate::utils::columnize_expr`]
1489    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1490        match self {
1491            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1492                .output_expressions()?
1493                .into_iter()
1494                .zip(self.schema().columns())
1495                .collect()),
1496            LogicalPlan::Window(Window {
1497                window_expr,
1498                input,
1499                schema,
1500            }) => {
1501                // The input could be another Window, so the result should also include the input's. For Example:
1502                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1503                // Its plan is:
1504                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1505                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1506                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1507                //       TableScan: t projection=[a, b]
1508                let mut output_exprs = input.columnized_output_exprs()?;
1509                let input_len = input.schema().fields().len();
1510                output_exprs.extend(
1511                    window_expr
1512                        .iter()
1513                        .zip(schema.columns().into_iter().skip(input_len)),
1514                );
1515                Ok(output_exprs)
1516            }
1517            _ => Ok(vec![]),
1518        }
1519    }
1520}
1521
1522impl LogicalPlan {
1523    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1524    /// ...) replaced with corresponding values provided in
1525    /// `params_values`
1526    ///
1527    /// See [`Self::with_param_values`] for examples and usage with an owned
1528    /// `ParamValues`
1529    pub fn replace_params_with_values(
1530        self,
1531        param_values: &ParamValues,
1532    ) -> Result<LogicalPlan> {
1533        self.transform_up_with_subqueries(|plan| {
1534            let schema = Arc::clone(plan.schema());
1535            let name_preserver = NamePreserver::new(&plan);
1536            plan.map_expressions(|e| {
1537                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1538                if !has_placeholder {
1539                    // Performance optimization:
1540                    // avoid NamePreserver copy and second pass over expression
1541                    // if no placeholders.
1542                    Ok(Transformed::no(e))
1543                } else {
1544                    let original_name = name_preserver.save(&e);
1545                    let transformed_expr = e.transform_up(|e| {
1546                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1547                            let (value, metadata) = param_values
1548                                .get_placeholders_with_values(&id)?
1549                                .into_inner();
1550                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1551                        } else {
1552                            Ok(Transformed::no(e))
1553                        }
1554                    })?;
1555                    // Preserve name to avoid breaking column references to this expression
1556                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1557                }
1558            })?
1559            .map_data(|plan| plan.update_schema_data_type())
1560        })
1561        .map(|res| res.data)
1562    }
1563
1564    /// Recompute schema fields' data type after replacing params, ensuring fields data type can be
1565    /// updated according to the new parameters.
1566    ///
1567    /// Unlike `recompute_schema()`, this method rebuilds VALUES plans entirely to properly infer
1568    /// types types from literal values after placeholder substitution.
1569    fn update_schema_data_type(self) -> Result<LogicalPlan> {
1570        match self {
1571            // Build `LogicalPlan::Values` from the values for type inference.
1572            // We can't use `recompute_schema` because it skips recomputing for
1573            // `LogicalPlan::Values`.
1574            LogicalPlan::Values(Values { values, schema: _ }) => {
1575                LogicalPlanBuilder::values(values)?.build()
1576            }
1577            // other plans can just use `recompute_schema` directly.
1578            plan => plan.recompute_schema(),
1579        }
1580    }
1581
1582    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1583    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1584        let mut param_names = HashSet::new();
1585        self.apply_with_subqueries(|plan| {
1586            plan.apply_expressions(|expr| {
1587                expr.apply(|expr| {
1588                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1589                        param_names.insert(id.clone());
1590                    }
1591                    Ok(TreeNodeRecursion::Continue)
1592                })
1593            })
1594        })
1595        .map(|_| param_names)
1596    }
1597
1598    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1599    ///
1600    /// Note that this will drop any extension or field metadata attached to parameters. Use
1601    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1602    pub fn get_parameter_types(
1603        &self,
1604    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1605        let mut parameter_fields = self.get_parameter_fields()?;
1606        Ok(parameter_fields
1607            .drain()
1608            .map(|(name, maybe_field)| {
1609                (name, maybe_field.map(|field| field.data_type().clone()))
1610            })
1611            .collect())
1612    }
1613
1614    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1615    pub fn get_parameter_fields(
1616        &self,
1617    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1618        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1619
1620        self.apply_with_subqueries(|plan| {
1621            plan.apply_expressions(|expr| {
1622                expr.apply(|expr| {
1623                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1624                        let prev = param_types.get(id);
1625                        match (prev, field) {
1626                            (Some(Some(prev)), Some(field)) => {
1627                                check_metadata_with_storage_equal(
1628                                    (field.data_type(), Some(field.metadata())),
1629                                    (prev.data_type(), Some(prev.metadata())),
1630                                    "parameter",
1631                                    &format!(": Conflicting types for id {id}"),
1632                                )?;
1633                            }
1634                            (_, Some(field)) => {
1635                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1636                            }
1637                            _ => {
1638                                param_types.insert(id.clone(), None);
1639                            }
1640                        }
1641                    }
1642                    Ok(TreeNodeRecursion::Continue)
1643                })
1644            })
1645        })
1646        .map(|_| param_types)
1647    }
1648
1649    // ------------
1650    // Various implementations for printing out LogicalPlans
1651    // ------------
1652
1653    /// Return a `format`able structure that produces a single line
1654    /// per node.
1655    ///
1656    /// # Example
1657    ///
1658    /// ```text
1659    /// Projection: employee.id
1660    ///    Filter: employee.state Eq Utf8(\"CO\")\
1661    ///       CsvScan: employee projection=Some([0, 3])
1662    /// ```
1663    ///
1664    /// ```
1665    /// use arrow::datatypes::{DataType, Field, Schema};
1666    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1667    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1668    /// let plan = table_scan(Some("t1"), &schema, None)
1669    ///     .unwrap()
1670    ///     .filter(col("id").eq(lit(5)))
1671    ///     .unwrap()
1672    ///     .build()
1673    ///     .unwrap();
1674    ///
1675    /// // Format using display_indent
1676    /// let display_string = format!("{}", plan.display_indent());
1677    ///
1678    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1679    /// ```
1680    pub fn display_indent(&self) -> impl Display + '_ {
1681        // Boilerplate structure to wrap LogicalPlan with something
1682        // that that can be formatted
1683        struct Wrapper<'a>(&'a LogicalPlan);
1684        impl Display for Wrapper<'_> {
1685            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1686                let with_schema = false;
1687                let mut visitor = IndentVisitor::new(f, with_schema);
1688                match self.0.visit_with_subqueries(&mut visitor) {
1689                    Ok(_) => Ok(()),
1690                    Err(_) => Err(fmt::Error),
1691                }
1692            }
1693        }
1694        Wrapper(self)
1695    }
1696
1697    /// Return a `format`able structure that produces a single line
1698    /// per node that includes the output schema. For example:
1699    ///
1700    /// ```text
1701    /// Projection: employee.id [id:Int32]\
1702    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1703    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1704    /// ```
1705    ///
1706    /// ```
1707    /// use arrow::datatypes::{DataType, Field, Schema};
1708    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1709    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1710    /// let plan = table_scan(Some("t1"), &schema, None)
1711    ///     .unwrap()
1712    ///     .filter(col("id").eq(lit(5)))
1713    ///     .unwrap()
1714    ///     .build()
1715    ///     .unwrap();
1716    ///
1717    /// // Format using display_indent_schema
1718    /// let display_string = format!("{}", plan.display_indent_schema());
1719    ///
1720    /// assert_eq!(
1721    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1722    ///             \n  TableScan: t1 [id:Int32]",
1723    ///     display_string
1724    /// );
1725    /// ```
1726    pub fn display_indent_schema(&self) -> impl Display + '_ {
1727        // Boilerplate structure to wrap LogicalPlan with something
1728        // that that can be formatted
1729        struct Wrapper<'a>(&'a LogicalPlan);
1730        impl Display for Wrapper<'_> {
1731            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1732                let with_schema = true;
1733                let mut visitor = IndentVisitor::new(f, with_schema);
1734                match self.0.visit_with_subqueries(&mut visitor) {
1735                    Ok(_) => Ok(()),
1736                    Err(_) => Err(fmt::Error),
1737                }
1738            }
1739        }
1740        Wrapper(self)
1741    }
1742
1743    /// Return a displayable structure that produces plan in postgresql JSON format.
1744    ///
1745    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1746    pub fn display_pg_json(&self) -> impl Display + '_ {
1747        // Boilerplate structure to wrap LogicalPlan with something
1748        // that that can be formatted
1749        struct Wrapper<'a>(&'a LogicalPlan);
1750        impl Display for Wrapper<'_> {
1751            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1752                let mut visitor = PgJsonVisitor::new(f);
1753                visitor.with_schema(true);
1754                match self.0.visit_with_subqueries(&mut visitor) {
1755                    Ok(_) => Ok(()),
1756                    Err(_) => Err(fmt::Error),
1757                }
1758            }
1759        }
1760        Wrapper(self)
1761    }
1762
1763    /// Return a `format`able structure that produces lines meant for
1764    /// graphical display using the `DOT` language. This format can be
1765    /// visualized using software from
1766    /// [`graphviz`](https://graphviz.org/)
1767    ///
1768    /// This currently produces two graphs -- one with the basic
1769    /// structure, and one with additional details such as schema.
1770    ///
1771    /// ```
1772    /// use arrow::datatypes::{DataType, Field, Schema};
1773    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1774    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1775    /// let plan = table_scan(Some("t1"), &schema, None)
1776    ///     .unwrap()
1777    ///     .filter(col("id").eq(lit(5)))
1778    ///     .unwrap()
1779    ///     .build()
1780    ///     .unwrap();
1781    ///
1782    /// // Format using display_graphviz
1783    /// let graphviz_string = format!("{}", plan.display_graphviz());
1784    /// ```
1785    ///
1786    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1787    /// commands can be used to render it as a pdf:
1788    ///
1789    /// ```bash
1790    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1791    /// ```
1792    pub fn display_graphviz(&self) -> impl Display + '_ {
1793        // Boilerplate structure to wrap LogicalPlan with something
1794        // that that can be formatted
1795        struct Wrapper<'a>(&'a LogicalPlan);
1796        impl Display for Wrapper<'_> {
1797            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1798                let mut visitor = GraphvizVisitor::new(f);
1799
1800                visitor.start_graph()?;
1801
1802                visitor.pre_visit_plan("LogicalPlan")?;
1803                self.0
1804                    .visit_with_subqueries(&mut visitor)
1805                    .map_err(|_| fmt::Error)?;
1806                visitor.post_visit_plan()?;
1807
1808                visitor.set_with_schema(true);
1809                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1810                self.0
1811                    .visit_with_subqueries(&mut visitor)
1812                    .map_err(|_| fmt::Error)?;
1813                visitor.post_visit_plan()?;
1814
1815                visitor.end_graph()?;
1816                Ok(())
1817            }
1818        }
1819        Wrapper(self)
1820    }
1821
1822    /// Return a `format`able structure with the a human readable
1823    /// description of this LogicalPlan node per node, not including
1824    /// children. For example:
1825    ///
1826    /// ```text
1827    /// Projection: id
1828    /// ```
1829    /// ```
1830    /// use arrow::datatypes::{DataType, Field, Schema};
1831    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1832    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1833    /// let plan = table_scan(Some("t1"), &schema, None)
1834    ///     .unwrap()
1835    ///     .build()
1836    ///     .unwrap();
1837    ///
1838    /// // Format using display
1839    /// let display_string = format!("{}", plan.display());
1840    ///
1841    /// assert_eq!("TableScan: t1", display_string);
1842    /// ```
1843    pub fn display(&self) -> impl Display + '_ {
1844        // Boilerplate structure to wrap LogicalPlan with something
1845        // that that can be formatted
1846        struct Wrapper<'a>(&'a LogicalPlan);
1847        impl Display for Wrapper<'_> {
1848            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1849                match self.0 {
1850                    LogicalPlan::EmptyRelation(EmptyRelation {
1851                        produce_one_row,
1852                        schema: _,
1853                    }) => {
1854                        let rows = if *produce_one_row { 1 } else { 0 };
1855                        write!(f, "EmptyRelation: rows={rows}")
1856                    }
1857                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1858                        is_distinct, ..
1859                    }) => {
1860                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1861                    }
1862                    LogicalPlan::Values(Values { values, .. }) => {
1863                        let str_values: Vec<_> = values
1864                            .iter()
1865                            // limit to only 5 values to avoid horrible display
1866                            .take(5)
1867                            .map(|row| {
1868                                let item = row
1869                                    .iter()
1870                                    .map(|expr| expr.to_string())
1871                                    .collect::<Vec<_>>()
1872                                    .join(", ");
1873                                format!("({item})")
1874                            })
1875                            .collect();
1876
1877                        let eclipse = if values.len() > 5 { "..." } else { "" };
1878                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1879                    }
1880
1881                    LogicalPlan::TableScan(TableScan {
1882                        source,
1883                        table_name,
1884                        projection,
1885                        filters,
1886                        fetch,
1887                        ..
1888                    }) => {
1889                        let projected_fields = match projection {
1890                            Some(indices) => {
1891                                let schema = source.schema();
1892                                let names: Vec<&str> = indices
1893                                    .iter()
1894                                    .map(|i| schema.field(*i).name().as_str())
1895                                    .collect();
1896                                format!(" projection=[{}]", names.join(", "))
1897                            }
1898                            _ => "".to_string(),
1899                        };
1900
1901                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1902
1903                        if !filters.is_empty() {
1904                            let mut full_filter = vec![];
1905                            let mut partial_filter = vec![];
1906                            let mut unsupported_filters = vec![];
1907                            let filters: Vec<&Expr> = filters.iter().collect();
1908
1909                            if let Ok(results) =
1910                                source.supports_filters_pushdown(&filters)
1911                            {
1912                                filters.iter().zip(results.iter()).for_each(
1913                                    |(x, res)| match res {
1914                                        TableProviderFilterPushDown::Exact => {
1915                                            full_filter.push(x)
1916                                        }
1917                                        TableProviderFilterPushDown::Inexact => {
1918                                            partial_filter.push(x)
1919                                        }
1920                                        TableProviderFilterPushDown::Unsupported => {
1921                                            unsupported_filters.push(x)
1922                                        }
1923                                    },
1924                                );
1925                            }
1926
1927                            if !full_filter.is_empty() {
1928                                write!(
1929                                    f,
1930                                    ", full_filters=[{}]",
1931                                    expr_vec_fmt!(full_filter)
1932                                )?;
1933                            };
1934                            if !partial_filter.is_empty() {
1935                                write!(
1936                                    f,
1937                                    ", partial_filters=[{}]",
1938                                    expr_vec_fmt!(partial_filter)
1939                                )?;
1940                            }
1941                            if !unsupported_filters.is_empty() {
1942                                write!(
1943                                    f,
1944                                    ", unsupported_filters=[{}]",
1945                                    expr_vec_fmt!(unsupported_filters)
1946                                )?;
1947                            }
1948                        }
1949
1950                        if let Some(n) = fetch {
1951                            write!(f, ", fetch={n}")?;
1952                        }
1953
1954                        Ok(())
1955                    }
1956                    LogicalPlan::Projection(Projection { expr, .. }) => {
1957                        write!(f, "Projection:")?;
1958                        for (i, expr_item) in expr.iter().enumerate() {
1959                            if i > 0 {
1960                                write!(f, ",")?;
1961                            }
1962                            write!(f, " {expr_item}")?;
1963                        }
1964                        Ok(())
1965                    }
1966                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1967                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1968                    }
1969                    LogicalPlan::Copy(CopyTo {
1970                        input: _,
1971                        output_url,
1972                        file_type,
1973                        options,
1974                        ..
1975                    }) => {
1976                        let op_str = options
1977                            .iter()
1978                            .map(|(k, v)| format!("{k} {v}"))
1979                            .collect::<Vec<String>>()
1980                            .join(", ");
1981
1982                        write!(
1983                            f,
1984                            "CopyTo: format={} output_url={output_url} options: ({op_str})",
1985                            file_type.get_ext()
1986                        )
1987                    }
1988                    LogicalPlan::Ddl(ddl) => {
1989                        write!(f, "{}", ddl.display())
1990                    }
1991                    LogicalPlan::Filter(Filter {
1992                        predicate: expr, ..
1993                    }) => write!(f, "Filter: {expr}"),
1994                    LogicalPlan::Window(Window { window_expr, .. }) => {
1995                        write!(
1996                            f,
1997                            "WindowAggr: windowExpr=[[{}]]",
1998                            expr_vec_fmt!(window_expr)
1999                        )
2000                    }
2001                    LogicalPlan::Aggregate(Aggregate {
2002                        group_expr,
2003                        aggr_expr,
2004                        ..
2005                    }) => write!(
2006                        f,
2007                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2008                        expr_vec_fmt!(group_expr),
2009                        expr_vec_fmt!(aggr_expr)
2010                    ),
2011                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2012                        write!(f, "Sort: ")?;
2013                        for (i, expr_item) in expr.iter().enumerate() {
2014                            if i > 0 {
2015                                write!(f, ", ")?;
2016                            }
2017                            write!(f, "{expr_item}")?;
2018                        }
2019                        if let Some(a) = fetch {
2020                            write!(f, ", fetch={a}")?;
2021                        }
2022
2023                        Ok(())
2024                    }
2025                    LogicalPlan::Join(Join {
2026                        on: keys,
2027                        filter,
2028                        join_constraint,
2029                        join_type,
2030                        ..
2031                    }) => {
2032                        let join_expr: Vec<String> =
2033                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2034                        let filter_expr = filter
2035                            .as_ref()
2036                            .map(|expr| format!(" Filter: {expr}"))
2037                            .unwrap_or_else(|| "".to_string());
2038                        let join_type = if filter.is_none()
2039                            && keys.is_empty()
2040                            && matches!(join_type, JoinType::Inner)
2041                        {
2042                            "Cross".to_string()
2043                        } else {
2044                            join_type.to_string()
2045                        };
2046                        match join_constraint {
2047                            JoinConstraint::On => {
2048                                write!(
2049                                    f,
2050                                    "{} Join: {}{}",
2051                                    join_type,
2052                                    join_expr.join(", "),
2053                                    filter_expr
2054                                )
2055                            }
2056                            JoinConstraint::Using => {
2057                                write!(
2058                                    f,
2059                                    "{} Join: Using {}{}",
2060                                    join_type,
2061                                    join_expr.join(", "),
2062                                    filter_expr,
2063                                )
2064                            }
2065                        }
2066                    }
2067                    LogicalPlan::Repartition(Repartition {
2068                        partitioning_scheme,
2069                        ..
2070                    }) => match partitioning_scheme {
2071                        Partitioning::RoundRobinBatch(n) => {
2072                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2073                        }
2074                        Partitioning::Hash(expr, n) => {
2075                            let hash_expr: Vec<String> =
2076                                expr.iter().map(|e| format!("{e}")).collect();
2077                            write!(
2078                                f,
2079                                "Repartition: Hash({}) partition_count={}",
2080                                hash_expr.join(", "),
2081                                n
2082                            )
2083                        }
2084                        Partitioning::DistributeBy(expr) => {
2085                            let dist_by_expr: Vec<String> =
2086                                expr.iter().map(|e| format!("{e}")).collect();
2087                            write!(
2088                                f,
2089                                "Repartition: DistributeBy({})",
2090                                dist_by_expr.join(", "),
2091                            )
2092                        }
2093                    },
2094                    LogicalPlan::Limit(limit) => {
2095                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
2096                        let skip_str = match limit.get_skip_type() {
2097                            Ok(SkipType::Literal(n)) => n.to_string(),
2098                            _ => limit
2099                                .skip
2100                                .as_ref()
2101                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2102                        };
2103                        let fetch_str = match limit.get_fetch_type() {
2104                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
2105                            Ok(FetchType::Literal(None)) => "None".to_string(),
2106                            _ => limit
2107                                .fetch
2108                                .as_ref()
2109                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2110                        };
2111                        write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2112                    }
2113                    LogicalPlan::Subquery(Subquery { .. }) => {
2114                        write!(f, "Subquery:")
2115                    }
2116                    LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2117                        write!(f, "SubqueryAlias: {alias}")
2118                    }
2119                    LogicalPlan::Statement(statement) => {
2120                        write!(f, "{}", statement.display())
2121                    }
2122                    LogicalPlan::Distinct(distinct) => match distinct {
2123                        Distinct::All(_) => write!(f, "Distinct:"),
2124                        Distinct::On(DistinctOn {
2125                            on_expr,
2126                            select_expr,
2127                            sort_expr,
2128                            ..
2129                        }) => write!(
2130                            f,
2131                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2132                            expr_vec_fmt!(on_expr),
2133                            expr_vec_fmt!(select_expr),
2134                            if let Some(sort_expr) = sort_expr {
2135                                expr_vec_fmt!(sort_expr)
2136                            } else {
2137                                "".to_string()
2138                            },
2139                        ),
2140                    },
2141                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2142                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2143                    LogicalPlan::Union(_) => write!(f, "Union"),
2144                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2145                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2146                        write!(f, "DescribeTable")
2147                    }
2148                    LogicalPlan::Unnest(Unnest {
2149                        input: plan,
2150                        list_type_columns: list_col_indices,
2151                        struct_type_columns: struct_col_indices,
2152                        ..
2153                    }) => {
2154                        let input_columns = plan.schema().columns();
2155                        let list_type_columns = list_col_indices
2156                            .iter()
2157                            .map(|(i, unnest_info)| {
2158                                format!(
2159                                    "{}|depth={}",
2160                                    &input_columns[*i].to_string(),
2161                                    unnest_info.depth
2162                                )
2163                            })
2164                            .collect::<Vec<String>>();
2165                        let struct_type_columns = struct_col_indices
2166                            .iter()
2167                            .map(|i| &input_columns[*i])
2168                            .collect::<Vec<&Column>>();
2169                        // get items from input_columns indexed by list_col_indices
2170                        write!(
2171                            f,
2172                            "Unnest: lists[{}] structs[{}]",
2173                            expr_vec_fmt!(list_type_columns),
2174                            expr_vec_fmt!(struct_type_columns)
2175                        )
2176                    }
2177                }
2178            }
2179        }
2180        Wrapper(self)
2181    }
2182}
2183
2184impl Display for LogicalPlan {
2185    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2186        self.display_indent().fmt(f)
2187    }
2188}
2189
2190impl ToStringifiedPlan for LogicalPlan {
2191    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2192        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2193    }
2194}
2195
2196/// Relationship produces 0 or 1 placeholder rows with specified output schema
2197/// In most cases the output schema for `EmptyRelation` would be empty,
2198/// however, it can be non-empty typically for optimizer rules
2199#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2200pub struct EmptyRelation {
2201    /// Whether to produce a placeholder row
2202    pub produce_one_row: bool,
2203    /// The schema description of the output
2204    pub schema: DFSchemaRef,
2205}
2206
2207// Manual implementation needed because of `schema` field. Comparison excludes this field.
2208impl PartialOrd for EmptyRelation {
2209    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2210        self.produce_one_row
2211            .partial_cmp(&other.produce_one_row)
2212            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2213            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2214    }
2215}
2216
2217/// A variadic query operation, Recursive CTE.
2218///
2219/// # Recursive Query Evaluation
2220///
2221/// From the [Postgres Docs]:
2222///
2223/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2224///    discard duplicate rows. Include all remaining rows in the result of the
2225///    recursive query, and also place them in a temporary working table.
2226///
2227/// 2. So long as the working table is not empty, repeat these steps:
2228///
2229/// * Evaluate the recursive term, substituting the current contents of the
2230///   working table for the recursive self-reference. For `UNION` (but not `UNION
2231///   ALL`), discard duplicate rows and rows that duplicate any previous result
2232///   row. Include all remaining rows in the result of the recursive query, and
2233///   also place them in a temporary intermediate table.
2234///
2235/// * Replace the contents of the working table with the contents of the
2236///   intermediate table, then empty the intermediate table.
2237///
2238/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2239#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2240pub struct RecursiveQuery {
2241    /// Name of the query
2242    pub name: String,
2243    /// The static term (initial contents of the working table)
2244    pub static_term: Arc<LogicalPlan>,
2245    /// The recursive term (evaluated on the contents of the working table until
2246    /// it returns an empty set)
2247    pub recursive_term: Arc<LogicalPlan>,
2248    /// Should the output of the recursive term be deduplicated (`UNION`) or
2249    /// not (`UNION ALL`).
2250    pub is_distinct: bool,
2251}
2252
2253/// Values expression. See
2254/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2255/// documentation for more details.
2256#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2257pub struct Values {
2258    /// The table schema
2259    pub schema: DFSchemaRef,
2260    /// Values
2261    pub values: Vec<Vec<Expr>>,
2262}
2263
2264// Manual implementation needed because of `schema` field. Comparison excludes this field.
2265impl PartialOrd for Values {
2266    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2267        self.values
2268            .partial_cmp(&other.values)
2269            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2270            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2271    }
2272}
2273
2274/// Evaluates an arbitrary list of expressions (essentially a
2275/// SELECT with an expression list) on its input.
2276#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2277// mark non_exhaustive to encourage use of try_new/new()
2278#[non_exhaustive]
2279pub struct Projection {
2280    /// The list of expressions
2281    pub expr: Vec<Expr>,
2282    /// The incoming logical plan
2283    pub input: Arc<LogicalPlan>,
2284    /// The schema description of the output
2285    pub schema: DFSchemaRef,
2286}
2287
2288// Manual implementation needed because of `schema` field. Comparison excludes this field.
2289impl PartialOrd for Projection {
2290    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2291        match self.expr.partial_cmp(&other.expr) {
2292            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2293            cmp => cmp,
2294        }
2295        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2296        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2297    }
2298}
2299
2300impl Projection {
2301    /// Create a new Projection
2302    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2303        let projection_schema = projection_schema(&input, &expr)?;
2304        Self::try_new_with_schema(expr, input, projection_schema)
2305    }
2306
2307    /// Create a new Projection using the specified output schema
2308    pub fn try_new_with_schema(
2309        expr: Vec<Expr>,
2310        input: Arc<LogicalPlan>,
2311        schema: DFSchemaRef,
2312    ) -> Result<Self> {
2313        #[expect(deprecated)]
2314        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2315            && expr.len() != schema.fields().len()
2316        {
2317            return plan_err!(
2318                "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2319                expr.len(),
2320                schema.fields().len()
2321            );
2322        }
2323        Ok(Self {
2324            expr,
2325            input,
2326            schema,
2327        })
2328    }
2329
2330    /// Create a new Projection using the specified output schema
2331    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2332        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2333        Self {
2334            expr,
2335            input,
2336            schema,
2337        }
2338    }
2339}
2340
2341/// Computes the schema of the result produced by applying a projection to the input logical plan.
2342///
2343/// # Arguments
2344///
2345/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2346///   will be computed.
2347/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2348///
2349/// # Metadata Handling
2350///
2351/// - **Schema-level metadata**: Passed through unchanged from the input schema
2352/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2353///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2354///
2355/// # Returns
2356///
2357/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2358/// produced by the projection operation. If the schema computation is successful,
2359/// the `Result` will contain the schema; otherwise, it will contain an error.
2360pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2361    // Preserve input schema metadata at the schema level
2362    let metadata = input.schema().metadata().clone();
2363
2364    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2365    let schema =
2366        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2367            .with_functional_dependencies(calc_func_dependencies_for_project(
2368                exprs, input,
2369            )?)?;
2370
2371    Ok(Arc::new(schema))
2372}
2373
2374/// Aliased subquery
2375#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2376// mark non_exhaustive to encourage use of try_new/new()
2377#[non_exhaustive]
2378pub struct SubqueryAlias {
2379    /// The incoming logical plan
2380    pub input: Arc<LogicalPlan>,
2381    /// The alias for the input relation
2382    pub alias: TableReference,
2383    /// The schema with qualified field names
2384    pub schema: DFSchemaRef,
2385}
2386
2387impl SubqueryAlias {
2388    pub fn try_new(
2389        plan: Arc<LogicalPlan>,
2390        alias: impl Into<TableReference>,
2391    ) -> Result<Self> {
2392        let alias = alias.into();
2393
2394        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2395        // no field must share the same column name as this would lead to ambiguity when referencing
2396        // columns in parent logical nodes.
2397
2398        // Compute unique aliases, if any, for each column of the input's schema.
2399        let aliases = unique_field_aliases(plan.schema().fields());
2400        let is_projection_needed = aliases.iter().any(Option::is_some);
2401
2402        // Insert a projection node, if needed, to make sure aliases are applied.
2403        let plan = if is_projection_needed {
2404            let projection_expressions = aliases
2405                .iter()
2406                .zip(plan.schema().iter())
2407                .map(|(alias, (qualifier, field))| {
2408                    let column =
2409                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2410                    match alias {
2411                        None => column,
2412                        Some(alias) => {
2413                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2414                        }
2415                    }
2416                })
2417                .collect();
2418            let projection = Projection::try_new(projection_expressions, plan)?;
2419            Arc::new(LogicalPlan::Projection(projection))
2420        } else {
2421            plan
2422        };
2423
2424        // Requalify fields with the new `alias`.
2425        let fields = plan.schema().fields().clone();
2426        let meta_data = plan.schema().metadata().clone();
2427        let func_dependencies = plan.schema().functional_dependencies().clone();
2428
2429        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2430        let schema = schema.as_arrow();
2431
2432        let schema = DFSchemaRef::new(
2433            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2434                .with_functional_dependencies(func_dependencies)?,
2435        );
2436        Ok(SubqueryAlias {
2437            input: plan,
2438            alias,
2439            schema,
2440        })
2441    }
2442}
2443
2444// Manual implementation needed because of `schema` field. Comparison excludes this field.
2445impl PartialOrd for SubqueryAlias {
2446    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2447        match self.input.partial_cmp(&other.input) {
2448            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2449            cmp => cmp,
2450        }
2451        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2452        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2453    }
2454}
2455
2456/// Filters rows from its input that do not match an
2457/// expression (essentially a WHERE clause with a predicate
2458/// expression).
2459///
2460/// Semantically, `<predicate>` is evaluated for each row of the input;
2461/// If the value of `<predicate>` is true, the input row is passed to
2462/// the output. If the value of `<predicate>` is false, the row is
2463/// discarded.
2464///
2465/// Filter should not be created directly but instead use `try_new()`
2466/// and that these fields are only pub to support pattern matching
2467#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2468#[non_exhaustive]
2469pub struct Filter {
2470    /// The predicate expression, which must have Boolean type.
2471    pub predicate: Expr,
2472    /// The incoming logical plan
2473    pub input: Arc<LogicalPlan>,
2474}
2475
2476impl Filter {
2477    /// Create a new filter operator.
2478    ///
2479    /// Notes: as Aliases have no effect on the output of a filter operator,
2480    /// they are removed from the predicate expression.
2481    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2482        Self::try_new_internal(predicate, input)
2483    }
2484
2485    /// Create a new filter operator for a having clause.
2486    /// This is similar to a filter, but its having flag is set to true.
2487    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2488    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2489        Self::try_new_internal(predicate, input)
2490    }
2491
2492    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2493        match data_type {
2494            // Interpret NULL as a missing boolean value.
2495            DataType::Boolean | DataType::Null => true,
2496            DataType::Dictionary(_, value_type) => {
2497                Filter::is_allowed_filter_type(value_type.as_ref())
2498            }
2499            _ => false,
2500        }
2501    }
2502
2503    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2504        // Filter predicates must return a boolean value so we try and validate that here.
2505        // Note that it is not always possible to resolve the predicate expression during plan
2506        // construction (such as with correlated subqueries) so we make a best effort here and
2507        // ignore errors resolving the expression against the schema.
2508        if let Ok(predicate_type) = predicate.get_type(input.schema())
2509            && !Filter::is_allowed_filter_type(&predicate_type)
2510        {
2511            return plan_err!(
2512                "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2513            );
2514        }
2515
2516        Ok(Self {
2517            predicate: predicate.unalias_nested().data,
2518            input,
2519        })
2520    }
2521
2522    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2523    ///
2524    /// This function will return `true` if its predicate contains a conjunction of
2525    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2526    /// by this conjunction.
2527    ///
2528    /// For example, for the table:
2529    /// ```sql
2530    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2531    /// ```
2532    /// `Filter(a = 2).is_scalar() == true`
2533    /// , whereas
2534    /// `Filter(b = 2).is_scalar() == false`
2535    /// and
2536    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2537    fn is_scalar(&self) -> bool {
2538        let schema = self.input.schema();
2539
2540        let functional_dependencies = self.input.schema().functional_dependencies();
2541        let unique_keys = functional_dependencies.iter().filter(|dep| {
2542            let nullable = dep.nullable
2543                && dep
2544                    .source_indices
2545                    .iter()
2546                    .any(|&source| schema.field(source).is_nullable());
2547            !nullable
2548                && dep.mode == Dependency::Single
2549                && dep.target_indices.len() == schema.fields().len()
2550        });
2551
2552        let exprs = split_conjunction(&self.predicate);
2553        let eq_pred_cols: HashSet<_> = exprs
2554            .iter()
2555            .filter_map(|expr| {
2556                let Expr::BinaryExpr(BinaryExpr {
2557                    left,
2558                    op: Operator::Eq,
2559                    right,
2560                }) = expr
2561                else {
2562                    return None;
2563                };
2564                // This is a no-op filter expression
2565                if left == right {
2566                    return None;
2567                }
2568
2569                match (left.as_ref(), right.as_ref()) {
2570                    (Expr::Column(_), Expr::Column(_)) => None,
2571                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2572                        Some(schema.index_of_column(c).unwrap())
2573                    }
2574                    _ => None,
2575                }
2576            })
2577            .collect();
2578
2579        // If we have a functional dependence that is a subset of our predicate,
2580        // this filter is scalar
2581        for key in unique_keys {
2582            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2583                return true;
2584            }
2585        }
2586        false
2587    }
2588}
2589
2590/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2591///
2592/// # Output Schema
2593///
2594/// The output schema is the input schema followed by the window function
2595/// expressions, in order.
2596///
2597/// For example, given the input schema `"A", "B", "C"` and the window function
2598/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2599/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2600/// output column.
2601///
2602/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2603/// schema.
2604#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2605pub struct Window {
2606    /// The incoming logical plan
2607    pub input: Arc<LogicalPlan>,
2608    /// The window function expression
2609    pub window_expr: Vec<Expr>,
2610    /// The schema description of the window output
2611    pub schema: DFSchemaRef,
2612}
2613
2614impl Window {
2615    /// Create a new window operator.
2616    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2617        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2618            .schema()
2619            .iter()
2620            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2621            .collect();
2622        let input_len = fields.len();
2623        let mut window_fields = fields;
2624        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2625        window_fields.extend_from_slice(expr_fields.as_slice());
2626        let metadata = input.schema().metadata().clone();
2627
2628        // Update functional dependencies for window:
2629        let mut window_func_dependencies =
2630            input.schema().functional_dependencies().clone();
2631        window_func_dependencies.extend_target_indices(window_fields.len());
2632
2633        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2634        // of consecutive numbers per partition), we can represent this fact with
2635        // functional dependencies.
2636        let mut new_dependencies = window_expr
2637            .iter()
2638            .enumerate()
2639            .filter_map(|(idx, expr)| {
2640                let Expr::WindowFunction(window_fun) = expr else {
2641                    return None;
2642                };
2643                let WindowFunction {
2644                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2645                    params: WindowFunctionParams { partition_by, .. },
2646                } = window_fun.as_ref()
2647                else {
2648                    return None;
2649                };
2650                // When there is no PARTITION BY, row number will be unique
2651                // across the entire table.
2652                if udwf.name() == "row_number" && partition_by.is_empty() {
2653                    Some(idx + input_len)
2654                } else {
2655                    None
2656                }
2657            })
2658            .map(|idx| {
2659                FunctionalDependence::new(vec![idx], vec![], false)
2660                    .with_mode(Dependency::Single)
2661            })
2662            .collect::<Vec<_>>();
2663
2664        if !new_dependencies.is_empty() {
2665            for dependence in new_dependencies.iter_mut() {
2666                dependence.target_indices = (0..window_fields.len()).collect();
2667            }
2668            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2669            let new_deps = FunctionalDependencies::new(new_dependencies);
2670            window_func_dependencies.extend(new_deps);
2671        }
2672
2673        // Validate that FILTER clauses are only used with aggregate window functions
2674        if let Some(e) = window_expr.iter().find(|e| {
2675            matches!(
2676                e,
2677                Expr::WindowFunction(wf)
2678                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2679                        && wf.params.filter.is_some()
2680            )
2681        }) {
2682            return plan_err!(
2683                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2684            );
2685        }
2686
2687        Self::try_new_with_schema(
2688            window_expr,
2689            input,
2690            Arc::new(
2691                DFSchema::new_with_metadata(window_fields, metadata)?
2692                    .with_functional_dependencies(window_func_dependencies)?,
2693            ),
2694        )
2695    }
2696
2697    /// Create a new window function using the provided schema to avoid the overhead of
2698    /// building the schema again when the schema is already known.
2699    ///
2700    /// This method should only be called when you are absolutely sure that the schema being
2701    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2702    pub fn try_new_with_schema(
2703        window_expr: Vec<Expr>,
2704        input: Arc<LogicalPlan>,
2705        schema: DFSchemaRef,
2706    ) -> Result<Self> {
2707        let input_fields_count = input.schema().fields().len();
2708        if schema.fields().len() != input_fields_count + window_expr.len() {
2709            return plan_err!(
2710                "Window schema has wrong number of fields. Expected {} got {}",
2711                input_fields_count + window_expr.len(),
2712                schema.fields().len()
2713            );
2714        }
2715
2716        Ok(Window {
2717            input,
2718            window_expr,
2719            schema,
2720        })
2721    }
2722}
2723
2724// Manual implementation needed because of `schema` field. Comparison excludes this field.
2725impl PartialOrd for Window {
2726    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2727        match self.input.partial_cmp(&other.input)? {
2728            Ordering::Equal => {} // continue
2729            not_equal => return Some(not_equal),
2730        }
2731
2732        match self.window_expr.partial_cmp(&other.window_expr)? {
2733            Ordering::Equal => {} // continue
2734            not_equal => return Some(not_equal),
2735        }
2736
2737        // Contract for PartialOrd and PartialEq consistency requires that
2738        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2739        if self == other {
2740            Some(Ordering::Equal)
2741        } else {
2742            None
2743        }
2744    }
2745}
2746
2747/// Produces rows from a table provider by reference or from the context
2748#[derive(Clone)]
2749pub struct TableScan {
2750    /// The name of the table
2751    pub table_name: TableReference,
2752    /// The source of the table
2753    pub source: Arc<dyn TableSource>,
2754    /// Optional column indices to use as a projection
2755    pub projection: Option<Vec<usize>>,
2756    /// The schema description of the output
2757    pub projected_schema: DFSchemaRef,
2758    /// Optional expressions to be used as filters by the table provider
2759    pub filters: Vec<Expr>,
2760    /// Optional number of rows to read
2761    pub fetch: Option<usize>,
2762}
2763
2764impl Debug for TableScan {
2765    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2766        f.debug_struct("TableScan")
2767            .field("table_name", &self.table_name)
2768            .field("source", &"...")
2769            .field("projection", &self.projection)
2770            .field("projected_schema", &self.projected_schema)
2771            .field("filters", &self.filters)
2772            .field("fetch", &self.fetch)
2773            .finish_non_exhaustive()
2774    }
2775}
2776
2777impl PartialEq for TableScan {
2778    fn eq(&self, other: &Self) -> bool {
2779        self.table_name == other.table_name
2780            && self.projection == other.projection
2781            && self.projected_schema == other.projected_schema
2782            && self.filters == other.filters
2783            && self.fetch == other.fetch
2784    }
2785}
2786
2787impl Eq for TableScan {}
2788
2789// Manual implementation needed because of `source` and `projected_schema` fields.
2790// Comparison excludes these field.
2791impl PartialOrd for TableScan {
2792    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2793        #[derive(PartialEq, PartialOrd)]
2794        struct ComparableTableScan<'a> {
2795            /// The name of the table
2796            pub table_name: &'a TableReference,
2797            /// Optional column indices to use as a projection
2798            pub projection: &'a Option<Vec<usize>>,
2799            /// Optional expressions to be used as filters by the table provider
2800            pub filters: &'a Vec<Expr>,
2801            /// Optional number of rows to read
2802            pub fetch: &'a Option<usize>,
2803        }
2804        let comparable_self = ComparableTableScan {
2805            table_name: &self.table_name,
2806            projection: &self.projection,
2807            filters: &self.filters,
2808            fetch: &self.fetch,
2809        };
2810        let comparable_other = ComparableTableScan {
2811            table_name: &other.table_name,
2812            projection: &other.projection,
2813            filters: &other.filters,
2814            fetch: &other.fetch,
2815        };
2816        comparable_self
2817            .partial_cmp(&comparable_other)
2818            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2819            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2820    }
2821}
2822
2823impl Hash for TableScan {
2824    fn hash<H: Hasher>(&self, state: &mut H) {
2825        self.table_name.hash(state);
2826        self.projection.hash(state);
2827        self.projected_schema.hash(state);
2828        self.filters.hash(state);
2829        self.fetch.hash(state);
2830    }
2831}
2832
2833impl TableScan {
2834    /// Initialize TableScan with appropriate schema from the given
2835    /// arguments.
2836    pub fn try_new(
2837        table_name: impl Into<TableReference>,
2838        table_source: Arc<dyn TableSource>,
2839        projection: Option<Vec<usize>>,
2840        filters: Vec<Expr>,
2841        fetch: Option<usize>,
2842    ) -> Result<Self> {
2843        let table_name = table_name.into();
2844
2845        if table_name.table().is_empty() {
2846            return plan_err!("table_name cannot be empty");
2847        }
2848        let schema = table_source.schema();
2849        let func_dependencies = FunctionalDependencies::new_from_constraints(
2850            table_source.constraints(),
2851            schema.fields.len(),
2852        );
2853        let projected_schema = projection
2854            .as_ref()
2855            .map(|p| {
2856                let projected_func_dependencies =
2857                    func_dependencies.project_functional_dependencies(p, p.len());
2858
2859                let df_schema = DFSchema::new_with_metadata(
2860                    p.iter()
2861                        .map(|i| {
2862                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2863                        })
2864                        .collect(),
2865                    schema.metadata.clone(),
2866                )?;
2867                df_schema.with_functional_dependencies(projected_func_dependencies)
2868            })
2869            .unwrap_or_else(|| {
2870                let df_schema =
2871                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2872                df_schema.with_functional_dependencies(func_dependencies)
2873            })?;
2874        let projected_schema = Arc::new(projected_schema);
2875
2876        Ok(Self {
2877            table_name,
2878            source: table_source,
2879            projection,
2880            projected_schema,
2881            filters,
2882            fetch,
2883        })
2884    }
2885}
2886
2887// Repartition the plan based on a partitioning scheme.
2888#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2889pub struct Repartition {
2890    /// The incoming logical plan
2891    pub input: Arc<LogicalPlan>,
2892    /// The partitioning scheme
2893    pub partitioning_scheme: Partitioning,
2894}
2895
2896/// Union multiple inputs
2897#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2898pub struct Union {
2899    /// Inputs to merge
2900    pub inputs: Vec<Arc<LogicalPlan>>,
2901    /// Union schema. Should be the same for all inputs.
2902    pub schema: DFSchemaRef,
2903}
2904
2905impl Union {
2906    /// Constructs new Union instance deriving schema from inputs.
2907    /// Schema data types must match exactly.
2908    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2909        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2910        Ok(Union { inputs, schema })
2911    }
2912
2913    /// Constructs new Union instance deriving schema from inputs.
2914    /// Inputs do not have to have matching types and produced schema will
2915    /// take type from the first input.
2916    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2917    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2918        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2919        Ok(Union { inputs, schema })
2920    }
2921
2922    /// Constructs a new Union instance that combines rows from different tables by name,
2923    /// instead of by position. This means that the specified inputs need not have schemas
2924    /// that are all the same width.
2925    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2926        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2927        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2928
2929        Ok(Union { inputs, schema })
2930    }
2931
2932    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2933    /// in an additional `Projection` to account for absence of columns
2934    /// in input schemas or differing projection orders.
2935    fn rewrite_inputs_from_schema(
2936        schema: &Arc<DFSchema>,
2937        inputs: Vec<Arc<LogicalPlan>>,
2938    ) -> Result<Vec<Arc<LogicalPlan>>> {
2939        let schema_width = schema.iter().count();
2940        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2941        for input in inputs {
2942            // Any columns that exist within the derived schema but do not exist
2943            // within an input's schema should be replaced with `NULL` aliased
2944            // to the appropriate column in the derived schema.
2945            let mut expr = Vec::with_capacity(schema_width);
2946            for column in schema.columns() {
2947                if input
2948                    .schema()
2949                    .has_column_with_unqualified_name(column.name())
2950                {
2951                    expr.push(Expr::Column(column));
2952                } else {
2953                    expr.push(
2954                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2955                    );
2956                }
2957            }
2958            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2959                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2960            )));
2961        }
2962
2963        Ok(wrapped_inputs)
2964    }
2965
2966    /// Constructs new Union instance deriving schema from inputs.
2967    ///
2968    /// If `loose_types` is true, inputs do not need to have matching types and
2969    /// the produced schema will use the type from the first input.
2970    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2971    ///
2972    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2973    /// the constructed schema follows `UNION BY NAME` semantics.
2974    fn derive_schema_from_inputs(
2975        inputs: &[Arc<LogicalPlan>],
2976        loose_types: bool,
2977        by_name: bool,
2978    ) -> Result<DFSchemaRef> {
2979        if inputs.len() < 2 {
2980            return plan_err!("UNION requires at least two inputs");
2981        }
2982
2983        if by_name {
2984            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2985        } else {
2986            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2987        }
2988    }
2989
2990    fn derive_schema_from_inputs_by_name(
2991        inputs: &[Arc<LogicalPlan>],
2992        loose_types: bool,
2993    ) -> Result<DFSchemaRef> {
2994        type FieldData<'a> =
2995            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2996        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2997        for input in inputs.iter() {
2998            for field in input.schema().fields() {
2999                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3000                    cols.iter_mut().find(|(name, _)| name == field.name())
3001                {
3002                    if !loose_types && *data_type != field.data_type() {
3003                        return plan_err!(
3004                            "Found different types for field {}",
3005                            field.name()
3006                        );
3007                    }
3008
3009                    metadata.push(field.metadata());
3010                    // If the field is nullable in any one of the inputs,
3011                    // then the field in the final schema is also nullable.
3012                    *is_nullable |= field.is_nullable();
3013                    *occurrences += 1;
3014                } else {
3015                    cols.push((
3016                        field.name(),
3017                        (
3018                            field.data_type(),
3019                            field.is_nullable(),
3020                            vec![field.metadata()],
3021                            1,
3022                        ),
3023                    ));
3024                }
3025            }
3026        }
3027
3028        let union_fields = cols
3029            .into_iter()
3030            .map(
3031                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3032                    // If the final number of occurrences of the field is less
3033                    // than the number of inputs (i.e. the field is missing from
3034                    // one or more inputs), then it must be treated as nullable.
3035                    let final_is_nullable = if occurrences == inputs.len() {
3036                        is_nullable
3037                    } else {
3038                        true
3039                    };
3040
3041                    let mut field =
3042                        Field::new(name, data_type.clone(), final_is_nullable);
3043                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3044
3045                    (None, Arc::new(field))
3046                },
3047            )
3048            .collect::<Vec<(Option<TableReference>, _)>>();
3049
3050        let union_schema_metadata = intersect_metadata_for_union(
3051            inputs.iter().map(|input| input.schema().metadata()),
3052        );
3053
3054        // Functional Dependencies are not preserved after UNION operation
3055        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3056        let schema = Arc::new(schema);
3057
3058        Ok(schema)
3059    }
3060
3061    fn derive_schema_from_inputs_by_position(
3062        inputs: &[Arc<LogicalPlan>],
3063        loose_types: bool,
3064    ) -> Result<DFSchemaRef> {
3065        let first_schema = inputs[0].schema();
3066        let fields_count = first_schema.fields().len();
3067        for input in inputs.iter().skip(1) {
3068            if fields_count != input.schema().fields().len() {
3069                return plan_err!(
3070                    "UNION queries have different number of columns: \
3071                    left has {} columns whereas right has {} columns",
3072                    fields_count,
3073                    input.schema().fields().len()
3074                );
3075            }
3076        }
3077
3078        let mut name_counts: HashMap<String, usize> = HashMap::new();
3079        let union_fields = (0..fields_count)
3080            .map(|i| {
3081                let fields = inputs
3082                    .iter()
3083                    .map(|input| input.schema().field(i))
3084                    .collect::<Vec<_>>();
3085                let first_field = fields[0];
3086                let base_name = first_field.name().to_string();
3087
3088                let data_type = if loose_types {
3089                    // TODO apply type coercion here, or document why it's better to defer
3090                    // temporarily use the data type from the left input and later rely on the analyzer to
3091                    // coerce the two schemas into a common one.
3092                    first_field.data_type()
3093                } else {
3094                    fields.iter().skip(1).try_fold(
3095                        first_field.data_type(),
3096                        |acc, field| {
3097                            if acc != field.data_type() {
3098                                return plan_err!(
3099                                    "UNION field {i} have different type in inputs: \
3100                                    left has {} whereas right has {}",
3101                                    first_field.data_type(),
3102                                    field.data_type()
3103                                );
3104                            }
3105                            Ok(acc)
3106                        },
3107                    )?
3108                };
3109                let nullable = fields.iter().any(|field| field.is_nullable());
3110
3111                // Generate unique field name
3112                let name = if let Some(count) = name_counts.get_mut(&base_name) {
3113                    *count += 1;
3114                    format!("{base_name}_{count}")
3115                } else {
3116                    name_counts.insert(base_name.clone(), 0);
3117                    base_name
3118                };
3119
3120                let mut field = Field::new(&name, data_type.clone(), nullable);
3121                let field_metadata = intersect_metadata_for_union(
3122                    fields.iter().map(|field| field.metadata()),
3123                );
3124                field.set_metadata(field_metadata);
3125                Ok((None, Arc::new(field)))
3126            })
3127            .collect::<Result<_>>()?;
3128        let union_schema_metadata = intersect_metadata_for_union(
3129            inputs.iter().map(|input| input.schema().metadata()),
3130        );
3131
3132        // Functional Dependencies are not preserved after UNION operation
3133        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3134        let schema = Arc::new(schema);
3135
3136        Ok(schema)
3137    }
3138}
3139
3140// Manual implementation needed because of `schema` field. Comparison excludes this field.
3141impl PartialOrd for Union {
3142    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3143        self.inputs
3144            .partial_cmp(&other.inputs)
3145            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3146            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3147    }
3148}
3149
3150/// Describe the schema of table
3151///
3152/// # Example output:
3153///
3154/// ```sql
3155/// > describe traces;
3156/// +--------------------+-----------------------------+-------------+
3157/// | column_name        | data_type                   | is_nullable |
3158/// +--------------------+-----------------------------+-------------+
3159/// | attributes         | Utf8                        | YES         |
3160/// | duration_nano      | Int64                       | YES         |
3161/// | end_time_unix_nano | Int64                       | YES         |
3162/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3163/// | span.kind          | Utf8                        | YES         |
3164/// | span.name          | Utf8                        | YES         |
3165/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3166/// | time               | Timestamp(Nanosecond, None) | NO          |
3167/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3168/// | otel.status_code   | Utf8                        | YES         |
3169/// | parent_span_id     | Utf8                        | YES         |
3170/// +--------------------+-----------------------------+-------------+
3171/// ```
3172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3173pub struct DescribeTable {
3174    /// Table schema
3175    pub schema: Arc<Schema>,
3176    /// schema of describe table output
3177    pub output_schema: DFSchemaRef,
3178}
3179
3180// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3181// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3182impl PartialOrd for DescribeTable {
3183    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3184        // There is no relevant comparison for schemas
3185        None
3186    }
3187}
3188
3189/// Options for EXPLAIN
3190#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3191pub struct ExplainOption {
3192    /// Include detailed debug info
3193    pub verbose: bool,
3194    /// Actually execute the plan and report metrics
3195    pub analyze: bool,
3196    /// Output syntax/format
3197    pub format: ExplainFormat,
3198}
3199
3200impl Default for ExplainOption {
3201    fn default() -> Self {
3202        ExplainOption {
3203            verbose: false,
3204            analyze: false,
3205            format: ExplainFormat::Indent,
3206        }
3207    }
3208}
3209
3210impl ExplainOption {
3211    /// Builder‐style setter for `verbose`
3212    pub fn with_verbose(mut self, verbose: bool) -> Self {
3213        self.verbose = verbose;
3214        self
3215    }
3216
3217    /// Builder‐style setter for `analyze`
3218    pub fn with_analyze(mut self, analyze: bool) -> Self {
3219        self.analyze = analyze;
3220        self
3221    }
3222
3223    /// Builder‐style setter for `format`
3224    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3225        self.format = format;
3226        self
3227    }
3228}
3229
3230/// Produces a relation with string representations of
3231/// various parts of the plan
3232///
3233/// See [the documentation] for more information
3234///
3235/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3237pub struct Explain {
3238    /// Should extra (detailed, intermediate plans) be included?
3239    pub verbose: bool,
3240    /// Output format for explain, if specified.
3241    /// If none, defaults to `text`
3242    pub explain_format: ExplainFormat,
3243    /// The logical plan that is being EXPLAIN'd
3244    pub plan: Arc<LogicalPlan>,
3245    /// Represent the various stages plans have gone through
3246    pub stringified_plans: Vec<StringifiedPlan>,
3247    /// The output schema of the explain (2 columns of text)
3248    pub schema: DFSchemaRef,
3249    /// Used by physical planner to check if should proceed with planning
3250    pub logical_optimization_succeeded: bool,
3251}
3252
3253// Manual implementation needed because of `schema` field. Comparison excludes this field.
3254impl PartialOrd for Explain {
3255    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3256        #[derive(PartialEq, PartialOrd)]
3257        struct ComparableExplain<'a> {
3258            /// Should extra (detailed, intermediate plans) be included?
3259            pub verbose: &'a bool,
3260            /// The logical plan that is being EXPLAIN'd
3261            pub plan: &'a Arc<LogicalPlan>,
3262            /// Represent the various stages plans have gone through
3263            pub stringified_plans: &'a Vec<StringifiedPlan>,
3264            /// Used by physical planner to check if should proceed with planning
3265            pub logical_optimization_succeeded: &'a bool,
3266        }
3267        let comparable_self = ComparableExplain {
3268            verbose: &self.verbose,
3269            plan: &self.plan,
3270            stringified_plans: &self.stringified_plans,
3271            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3272        };
3273        let comparable_other = ComparableExplain {
3274            verbose: &other.verbose,
3275            plan: &other.plan,
3276            stringified_plans: &other.stringified_plans,
3277            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3278        };
3279        comparable_self
3280            .partial_cmp(&comparable_other)
3281            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3282            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3283    }
3284}
3285
3286/// Runs the actual plan, and then prints the physical plan with
3287/// with execution metrics.
3288#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3289pub struct Analyze {
3290    /// Should extra detail be included?
3291    pub verbose: bool,
3292    /// The logical plan that is being EXPLAIN ANALYZE'd
3293    pub input: Arc<LogicalPlan>,
3294    /// The output schema of the explain (2 columns of text)
3295    pub schema: DFSchemaRef,
3296}
3297
3298// Manual implementation needed because of `schema` field. Comparison excludes this field.
3299impl PartialOrd for Analyze {
3300    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3301        match self.verbose.partial_cmp(&other.verbose) {
3302            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3303            cmp => cmp,
3304        }
3305        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3306        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3307    }
3308}
3309
3310/// Extension operator defined outside of DataFusion
3311// TODO(clippy): This clippy `allow` should be removed if
3312// the manual `PartialEq` is removed in favor of a derive.
3313// (see `PartialEq` the impl for details.)
3314#[allow(clippy::allow_attributes)]
3315#[allow(clippy::derived_hash_with_manual_eq)]
3316#[derive(Debug, Clone, Eq, Hash)]
3317pub struct Extension {
3318    /// The runtime extension operator
3319    pub node: Arc<dyn UserDefinedLogicalNode>,
3320}
3321
3322// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3323// This manual implementation should be removed if
3324// https://github.com/rust-lang/rust/issues/39128 is fixed.
3325impl PartialEq for Extension {
3326    fn eq(&self, other: &Self) -> bool {
3327        self.node.eq(&other.node)
3328    }
3329}
3330
3331impl PartialOrd for Extension {
3332    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3333        self.node.partial_cmp(&other.node)
3334    }
3335}
3336
3337/// Produces the first `n` tuples from its input and discards the rest.
3338#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3339pub struct Limit {
3340    /// Number of rows to skip before fetch
3341    pub skip: Option<Box<Expr>>,
3342    /// Maximum number of rows to fetch,
3343    /// None means fetching all rows
3344    pub fetch: Option<Box<Expr>>,
3345    /// The logical plan
3346    pub input: Arc<LogicalPlan>,
3347}
3348
3349/// Different types of skip expression in Limit plan.
3350pub enum SkipType {
3351    /// The skip expression is a literal value.
3352    Literal(usize),
3353    /// Currently only supports expressions that can be folded into constants.
3354    UnsupportedExpr,
3355}
3356
3357/// Different types of fetch expression in Limit plan.
3358pub enum FetchType {
3359    /// The fetch expression is a literal value.
3360    /// `Literal(None)` means the fetch expression is not provided.
3361    Literal(Option<usize>),
3362    /// Currently only supports expressions that can be folded into constants.
3363    UnsupportedExpr,
3364}
3365
3366impl Limit {
3367    /// Get the skip type from the limit plan.
3368    pub fn get_skip_type(&self) -> Result<SkipType> {
3369        match self.skip.as_deref() {
3370            Some(expr) => match *expr {
3371                Expr::Literal(ScalarValue::Int64(s), _) => {
3372                    // `skip = NULL` is equivalent to `skip = 0`
3373                    let s = s.unwrap_or(0);
3374                    if s >= 0 {
3375                        Ok(SkipType::Literal(s as usize))
3376                    } else {
3377                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3378                    }
3379                }
3380                _ => Ok(SkipType::UnsupportedExpr),
3381            },
3382            // `skip = None` is equivalent to `skip = 0`
3383            None => Ok(SkipType::Literal(0)),
3384        }
3385    }
3386
3387    /// Get the fetch type from the limit plan.
3388    pub fn get_fetch_type(&self) -> Result<FetchType> {
3389        match self.fetch.as_deref() {
3390            Some(expr) => match *expr {
3391                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3392                    if s >= 0 {
3393                        Ok(FetchType::Literal(Some(s as usize)))
3394                    } else {
3395                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3396                    }
3397                }
3398                Expr::Literal(ScalarValue::Int64(None), _) => {
3399                    Ok(FetchType::Literal(None))
3400                }
3401                _ => Ok(FetchType::UnsupportedExpr),
3402            },
3403            None => Ok(FetchType::Literal(None)),
3404        }
3405    }
3406}
3407
3408/// Removes duplicate rows from the input
3409#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3410pub enum Distinct {
3411    /// Plain `DISTINCT` referencing all selection expressions
3412    All(Arc<LogicalPlan>),
3413    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3414    On(DistinctOn),
3415}
3416
3417impl Distinct {
3418    /// return a reference to the nodes input
3419    pub fn input(&self) -> &Arc<LogicalPlan> {
3420        match self {
3421            Distinct::All(input) => input,
3422            Distinct::On(DistinctOn { input, .. }) => input,
3423        }
3424    }
3425}
3426
3427/// Removes duplicate rows from the input
3428#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3429pub struct DistinctOn {
3430    /// The `DISTINCT ON` clause expression list
3431    pub on_expr: Vec<Expr>,
3432    /// The selected projection expression list
3433    pub select_expr: Vec<Expr>,
3434    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3435    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3436    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3437    pub sort_expr: Option<Vec<SortExpr>>,
3438    /// The logical plan that is being DISTINCT'd
3439    pub input: Arc<LogicalPlan>,
3440    /// The schema description of the DISTINCT ON output
3441    pub schema: DFSchemaRef,
3442}
3443
3444impl DistinctOn {
3445    /// Create a new `DistinctOn` struct.
3446    pub fn try_new(
3447        on_expr: Vec<Expr>,
3448        select_expr: Vec<Expr>,
3449        sort_expr: Option<Vec<SortExpr>>,
3450        input: Arc<LogicalPlan>,
3451    ) -> Result<Self> {
3452        if on_expr.is_empty() {
3453            return plan_err!("No `ON` expressions provided");
3454        }
3455
3456        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3457        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3458            .into_iter()
3459            .collect();
3460
3461        let dfschema = DFSchema::new_with_metadata(
3462            qualified_fields,
3463            input.schema().metadata().clone(),
3464        )?;
3465
3466        let mut distinct_on = DistinctOn {
3467            on_expr,
3468            select_expr,
3469            sort_expr: None,
3470            input,
3471            schema: Arc::new(dfschema),
3472        };
3473
3474        if let Some(sort_expr) = sort_expr {
3475            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3476        }
3477
3478        Ok(distinct_on)
3479    }
3480
3481    /// Try to update `self` with a new sort expressions.
3482    ///
3483    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3484    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3485        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3486
3487        // Check that the left-most sort expressions are the same as the `ON` expressions.
3488        let mut matched = true;
3489        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3490            if on != &sort.expr {
3491                matched = false;
3492                break;
3493            }
3494        }
3495
3496        if self.on_expr.len() > sort_expr.len() || !matched {
3497            return plan_err!(
3498                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3499            );
3500        }
3501
3502        self.sort_expr = Some(sort_expr);
3503        Ok(self)
3504    }
3505}
3506
3507// Manual implementation needed because of `schema` field. Comparison excludes this field.
3508impl PartialOrd for DistinctOn {
3509    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3510        #[derive(PartialEq, PartialOrd)]
3511        struct ComparableDistinctOn<'a> {
3512            /// The `DISTINCT ON` clause expression list
3513            pub on_expr: &'a Vec<Expr>,
3514            /// The selected projection expression list
3515            pub select_expr: &'a Vec<Expr>,
3516            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3517            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3518            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3519            pub sort_expr: &'a Option<Vec<SortExpr>>,
3520            /// The logical plan that is being DISTINCT'd
3521            pub input: &'a Arc<LogicalPlan>,
3522        }
3523        let comparable_self = ComparableDistinctOn {
3524            on_expr: &self.on_expr,
3525            select_expr: &self.select_expr,
3526            sort_expr: &self.sort_expr,
3527            input: &self.input,
3528        };
3529        let comparable_other = ComparableDistinctOn {
3530            on_expr: &other.on_expr,
3531            select_expr: &other.select_expr,
3532            sort_expr: &other.sort_expr,
3533            input: &other.input,
3534        };
3535        comparable_self
3536            .partial_cmp(&comparable_other)
3537            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3538            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3539    }
3540}
3541
3542/// Aggregates its input based on a set of grouping and aggregate
3543/// expressions (e.g. SUM).
3544///
3545/// # Output Schema
3546///
3547/// The output schema is the group expressions followed by the aggregate
3548/// expressions in order.
3549///
3550/// For example, given the input schema `"A", "B", "C"` and the aggregate
3551/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3552/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3553/// single new column
3554#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3555// mark non_exhaustive to encourage use of try_new/new()
3556#[non_exhaustive]
3557pub struct Aggregate {
3558    /// The incoming logical plan
3559    pub input: Arc<LogicalPlan>,
3560    /// Grouping expressions
3561    pub group_expr: Vec<Expr>,
3562    /// Aggregate expressions
3563    pub aggr_expr: Vec<Expr>,
3564    /// The schema description of the aggregate output
3565    pub schema: DFSchemaRef,
3566}
3567
3568impl Aggregate {
3569    /// Create a new aggregate operator.
3570    pub fn try_new(
3571        input: Arc<LogicalPlan>,
3572        group_expr: Vec<Expr>,
3573        aggr_expr: Vec<Expr>,
3574    ) -> Result<Self> {
3575        let group_expr = enumerate_grouping_sets(group_expr)?;
3576
3577        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3578
3579        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3580
3581        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3582
3583        // Even columns that cannot be null will become nullable when used in a grouping set.
3584        if is_grouping_set {
3585            qualified_fields = qualified_fields
3586                .into_iter()
3587                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3588                .collect::<Vec<_>>();
3589            qualified_fields.push((
3590                None,
3591                Field::new(
3592                    Self::INTERNAL_GROUPING_ID,
3593                    Self::grouping_id_type(qualified_fields.len()),
3594                    false,
3595                )
3596                .into(),
3597            ));
3598        }
3599
3600        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3601
3602        let schema = DFSchema::new_with_metadata(
3603            qualified_fields,
3604            input.schema().metadata().clone(),
3605        )?;
3606
3607        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3608    }
3609
3610    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3611    /// building the schema again when the schema is already known.
3612    ///
3613    /// This method should only be called when you are absolutely sure that the schema being
3614    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3615    #[expect(clippy::needless_pass_by_value)]
3616    pub fn try_new_with_schema(
3617        input: Arc<LogicalPlan>,
3618        group_expr: Vec<Expr>,
3619        aggr_expr: Vec<Expr>,
3620        schema: DFSchemaRef,
3621    ) -> Result<Self> {
3622        if group_expr.is_empty() && aggr_expr.is_empty() {
3623            return plan_err!(
3624                "Aggregate requires at least one grouping or aggregate expression. \
3625                Aggregate without grouping expressions nor aggregate expressions is \
3626                logically equivalent to, but less efficient than, VALUES producing \
3627                single row. Please use VALUES instead."
3628            );
3629        }
3630        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3631        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3632            return plan_err!(
3633                "Aggregate schema has wrong number of fields. Expected {} got {}",
3634                group_expr_count + aggr_expr.len(),
3635                schema.fields().len()
3636            );
3637        }
3638
3639        let aggregate_func_dependencies =
3640            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3641        let new_schema = schema.as_ref().clone();
3642        let schema = Arc::new(
3643            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3644        );
3645        Ok(Self {
3646            input,
3647            group_expr,
3648            aggr_expr,
3649            schema,
3650        })
3651    }
3652
3653    fn is_grouping_set(&self) -> bool {
3654        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3655    }
3656
3657    /// Get the output expressions.
3658    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3659        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3660            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3661        });
3662        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3663        if self.is_grouping_set() {
3664            exprs.push(&INTERNAL_ID_EXPR);
3665        }
3666        exprs.extend(self.aggr_expr.iter());
3667        debug_assert!(exprs.len() == self.schema.fields().len());
3668        Ok(exprs)
3669    }
3670
3671    /// Get the length of the group by expression in the output schema
3672    /// This is not simply group by expression length. Expression may be
3673    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3674    pub fn group_expr_len(&self) -> Result<usize> {
3675        grouping_set_expr_count(&self.group_expr)
3676    }
3677
3678    /// Returns the data type of the grouping id.
3679    /// The grouping ID value is a bitmask where each set bit
3680    /// indicates that the corresponding grouping expression is
3681    /// null
3682    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3683        if group_exprs <= 8 {
3684            DataType::UInt8
3685        } else if group_exprs <= 16 {
3686            DataType::UInt16
3687        } else if group_exprs <= 32 {
3688            DataType::UInt32
3689        } else {
3690            DataType::UInt64
3691        }
3692    }
3693
3694    /// Internal column used when the aggregation is a grouping set.
3695    ///
3696    /// This column contains a bitmask where each bit represents a grouping
3697    /// expression. The least significant bit corresponds to the rightmost
3698    /// grouping expression. A bit value of 0 indicates that the corresponding
3699    /// column is included in the grouping set, while a value of 1 means it is excluded.
3700    ///
3701    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3702    /// column will have the following values:
3703    ///     0b00: Both `a` and `b` are included
3704    ///     0b01: `b` is excluded
3705    ///     0b10: `a` is excluded
3706    ///     0b11: Both `a` and `b` are excluded
3707    ///
3708    /// This internal column is necessary because excluded columns are replaced
3709    /// with `NULL` values. To handle these cases correctly, we must distinguish
3710    /// between an actual `NULL` value in a column and a column being excluded from the set.
3711    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3712}
3713
3714// Manual implementation needed because of `schema` field. Comparison excludes this field.
3715impl PartialOrd for Aggregate {
3716    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3717        match self.input.partial_cmp(&other.input) {
3718            Some(Ordering::Equal) => {
3719                match self.group_expr.partial_cmp(&other.group_expr) {
3720                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3721                    cmp => cmp,
3722                }
3723            }
3724            cmp => cmp,
3725        }
3726        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3727        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3728    }
3729}
3730
3731/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3732fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3733    group_expr
3734        .iter()
3735        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3736}
3737
3738/// Calculates functional dependencies for aggregate expressions.
3739fn calc_func_dependencies_for_aggregate(
3740    // Expressions in the GROUP BY clause:
3741    group_expr: &[Expr],
3742    // Input plan of the aggregate:
3743    input: &LogicalPlan,
3744    // Aggregate schema
3745    aggr_schema: &DFSchema,
3746) -> Result<FunctionalDependencies> {
3747    // We can do a case analysis on how to propagate functional dependencies based on
3748    // whether the GROUP BY in question contains a grouping set expression:
3749    // - If so, the functional dependencies will be empty because we cannot guarantee
3750    //   that GROUP BY expression results will be unique.
3751    // - Otherwise, it may be possible to propagate functional dependencies.
3752    if !contains_grouping_set(group_expr) {
3753        let group_by_expr_names = group_expr
3754            .iter()
3755            .map(|item| item.schema_name().to_string())
3756            .collect::<IndexSet<_>>()
3757            .into_iter()
3758            .collect::<Vec<_>>();
3759        let aggregate_func_dependencies = aggregate_functional_dependencies(
3760            input.schema(),
3761            &group_by_expr_names,
3762            aggr_schema,
3763        );
3764        Ok(aggregate_func_dependencies)
3765    } else {
3766        Ok(FunctionalDependencies::empty())
3767    }
3768}
3769
3770/// This function projects functional dependencies of the `input` plan according
3771/// to projection expressions `exprs`.
3772fn calc_func_dependencies_for_project(
3773    exprs: &[Expr],
3774    input: &LogicalPlan,
3775) -> Result<FunctionalDependencies> {
3776    let input_fields = input.schema().field_names();
3777    // Calculate expression indices (if present) in the input schema.
3778    let proj_indices = exprs
3779        .iter()
3780        .map(|expr| match expr {
3781            #[expect(deprecated)]
3782            Expr::Wildcard { qualifier, options } => {
3783                let wildcard_fields = exprlist_to_fields(
3784                    vec![&Expr::Wildcard {
3785                        qualifier: qualifier.clone(),
3786                        options: options.clone(),
3787                    }],
3788                    input,
3789                )?;
3790                Ok::<_, DataFusionError>(
3791                    wildcard_fields
3792                        .into_iter()
3793                        .filter_map(|(qualifier, f)| {
3794                            let flat_name = qualifier
3795                                .map(|t| format!("{}.{}", t, f.name()))
3796                                .unwrap_or_else(|| f.name().clone());
3797                            input_fields.iter().position(|item| *item == flat_name)
3798                        })
3799                        .collect::<Vec<_>>(),
3800                )
3801            }
3802            Expr::Alias(alias) => {
3803                let name = format!("{}", alias.expr);
3804                Ok(input_fields
3805                    .iter()
3806                    .position(|item| *item == name)
3807                    .map(|i| vec![i])
3808                    .unwrap_or(vec![]))
3809            }
3810            _ => {
3811                let name = format!("{expr}");
3812                Ok(input_fields
3813                    .iter()
3814                    .position(|item| *item == name)
3815                    .map(|i| vec![i])
3816                    .unwrap_or(vec![]))
3817            }
3818        })
3819        .collect::<Result<Vec<_>>>()?
3820        .into_iter()
3821        .flatten()
3822        .collect::<Vec<_>>();
3823
3824    Ok(input
3825        .schema()
3826        .functional_dependencies()
3827        .project_functional_dependencies(&proj_indices, exprs.len()))
3828}
3829
3830/// Sorts its input according to a list of sort expressions.
3831#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3832pub struct Sort {
3833    /// The sort expressions
3834    pub expr: Vec<SortExpr>,
3835    /// The incoming logical plan
3836    pub input: Arc<LogicalPlan>,
3837    /// Optional fetch limit
3838    pub fetch: Option<usize>,
3839}
3840
3841/// Join two logical plans on one or more join columns
3842#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3843pub struct Join {
3844    /// Left input
3845    pub left: Arc<LogicalPlan>,
3846    /// Right input
3847    pub right: Arc<LogicalPlan>,
3848    /// Equijoin clause expressed as pairs of (left, right) join expressions
3849    pub on: Vec<(Expr, Expr)>,
3850    /// Filters applied during join (non-equi conditions)
3851    pub filter: Option<Expr>,
3852    /// Join type
3853    pub join_type: JoinType,
3854    /// Join constraint
3855    pub join_constraint: JoinConstraint,
3856    /// The output schema, containing fields from the left and right inputs
3857    pub schema: DFSchemaRef,
3858    /// Defines the null equality for the join.
3859    pub null_equality: NullEquality,
3860}
3861
3862impl Join {
3863    /// Creates a new Join operator with automatically computed schema.
3864    ///
3865    /// This constructor computes the schema based on the join type and inputs,
3866    /// removing the need to manually specify the schema or call `recompute_schema`.
3867    ///
3868    /// # Arguments
3869    ///
3870    /// * `left` - Left input plan
3871    /// * `right` - Right input plan
3872    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3873    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3874    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3875    /// * `join_constraint` - Join constraint (On, Using)
3876    /// * `null_equality` - How to handle nulls in join comparisons
3877    ///
3878    /// # Returns
3879    ///
3880    /// A new Join operator with the computed schema
3881    pub fn try_new(
3882        left: Arc<LogicalPlan>,
3883        right: Arc<LogicalPlan>,
3884        on: Vec<(Expr, Expr)>,
3885        filter: Option<Expr>,
3886        join_type: JoinType,
3887        join_constraint: JoinConstraint,
3888        null_equality: NullEquality,
3889    ) -> Result<Self> {
3890        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3891
3892        Ok(Join {
3893            left,
3894            right,
3895            on,
3896            filter,
3897            join_type,
3898            join_constraint,
3899            schema: Arc::new(join_schema),
3900            null_equality,
3901        })
3902    }
3903
3904    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3905    /// create the physical join.
3906    pub fn try_new_with_project_input(
3907        original: &LogicalPlan,
3908        left: Arc<LogicalPlan>,
3909        right: Arc<LogicalPlan>,
3910        column_on: (Vec<Column>, Vec<Column>),
3911    ) -> Result<(Self, bool)> {
3912        let original_join = match original {
3913            LogicalPlan::Join(join) => join,
3914            _ => return plan_err!("Could not create join with project input"),
3915        };
3916
3917        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3918        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3919
3920        let mut requalified = false;
3921
3922        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3923        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3924        if original_join.join_type == JoinType::Inner
3925            || original_join.join_type == JoinType::Left
3926            || original_join.join_type == JoinType::Right
3927            || original_join.join_type == JoinType::Full
3928        {
3929            (left_sch, right_sch, requalified) =
3930                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3931        }
3932
3933        let on: Vec<(Expr, Expr)> = column_on
3934            .0
3935            .into_iter()
3936            .zip(column_on.1)
3937            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3938            .collect();
3939
3940        let join_schema = build_join_schema(
3941            left_sch.schema(),
3942            right_sch.schema(),
3943            &original_join.join_type,
3944        )?;
3945
3946        Ok((
3947            Join {
3948                left,
3949                right,
3950                on,
3951                filter: original_join.filter.clone(),
3952                join_type: original_join.join_type,
3953                join_constraint: original_join.join_constraint,
3954                schema: Arc::new(join_schema),
3955                null_equality: original_join.null_equality,
3956            },
3957            requalified,
3958        ))
3959    }
3960}
3961
3962// Manual implementation needed because of `schema` field. Comparison excludes this field.
3963impl PartialOrd for Join {
3964    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3965        #[derive(PartialEq, PartialOrd)]
3966        struct ComparableJoin<'a> {
3967            /// Left input
3968            pub left: &'a Arc<LogicalPlan>,
3969            /// Right input
3970            pub right: &'a Arc<LogicalPlan>,
3971            /// Equijoin clause expressed as pairs of (left, right) join expressions
3972            pub on: &'a Vec<(Expr, Expr)>,
3973            /// Filters applied during join (non-equi conditions)
3974            pub filter: &'a Option<Expr>,
3975            /// Join type
3976            pub join_type: &'a JoinType,
3977            /// Join constraint
3978            pub join_constraint: &'a JoinConstraint,
3979            /// The null handling behavior for equalities
3980            pub null_equality: &'a NullEquality,
3981        }
3982        let comparable_self = ComparableJoin {
3983            left: &self.left,
3984            right: &self.right,
3985            on: &self.on,
3986            filter: &self.filter,
3987            join_type: &self.join_type,
3988            join_constraint: &self.join_constraint,
3989            null_equality: &self.null_equality,
3990        };
3991        let comparable_other = ComparableJoin {
3992            left: &other.left,
3993            right: &other.right,
3994            on: &other.on,
3995            filter: &other.filter,
3996            join_type: &other.join_type,
3997            join_constraint: &other.join_constraint,
3998            null_equality: &other.null_equality,
3999        };
4000        comparable_self
4001            .partial_cmp(&comparable_other)
4002            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4003            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4004    }
4005}
4006
4007/// Subquery
4008#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4009pub struct Subquery {
4010    /// The subquery
4011    pub subquery: Arc<LogicalPlan>,
4012    /// The outer references used in the subquery
4013    pub outer_ref_columns: Vec<Expr>,
4014    /// Span information for subquery projection columns
4015    pub spans: Spans,
4016}
4017
4018impl Normalizeable for Subquery {
4019    fn can_normalize(&self) -> bool {
4020        false
4021    }
4022}
4023
4024impl NormalizeEq for Subquery {
4025    fn normalize_eq(&self, other: &Self) -> bool {
4026        // TODO: may be implement NormalizeEq for LogicalPlan?
4027        *self.subquery == *other.subquery
4028            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4029            && self
4030                .outer_ref_columns
4031                .iter()
4032                .zip(other.outer_ref_columns.iter())
4033                .all(|(a, b)| a.normalize_eq(b))
4034    }
4035}
4036
4037impl Subquery {
4038    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4039        match plan {
4040            Expr::ScalarSubquery(it) => Ok(it),
4041            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4042            _ => plan_err!("Could not coerce into ScalarSubquery!"),
4043        }
4044    }
4045
4046    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4047        Subquery {
4048            subquery: plan,
4049            outer_ref_columns: self.outer_ref_columns.clone(),
4050            spans: Spans::new(),
4051        }
4052    }
4053}
4054
4055impl Debug for Subquery {
4056    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4057        write!(f, "<subquery>")
4058    }
4059}
4060
4061/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
4062///
4063/// See [`Partitioning`] for more details on partitioning
4064///
4065/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
4066#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4067pub enum Partitioning {
4068    /// Allocate batches using a round-robin algorithm and the specified number of partitions
4069    RoundRobinBatch(usize),
4070    /// Allocate rows based on a hash of one of more expressions and the specified number
4071    /// of partitions.
4072    Hash(Vec<Expr>, usize),
4073    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
4074    DistributeBy(Vec<Expr>),
4075}
4076
4077/// Represent the unnesting operation on a list column, such as the recursion depth and
4078/// the output column name after unnesting
4079///
4080/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4081///
4082/// ```text
4083///   input             output_name
4084///  ┌─────────┐      ┌─────────┐
4085///  │{{1,2}}  │      │ 1       │
4086///  ├─────────┼─────►├─────────┤
4087///  │{{3}}    │      │ 2       │
4088///  ├─────────┤      ├─────────┤
4089///  │{{4},{5}}│      │ 3       │
4090///  └─────────┘      ├─────────┤
4091///                   │ 4       │
4092///                   ├─────────┤
4093///                   │ 5       │
4094///                   └─────────┘
4095/// ```
4096#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4097pub struct ColumnUnnestList {
4098    pub output_column: Column,
4099    pub depth: usize,
4100}
4101
4102impl Display for ColumnUnnestList {
4103    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4104        write!(f, "{}|depth={}", self.output_column, self.depth)
4105    }
4106}
4107
4108/// Unnest a column that contains a nested list type. See
4109/// [`UnnestOptions`] for more details.
4110#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4111pub struct Unnest {
4112    /// The incoming logical plan
4113    pub input: Arc<LogicalPlan>,
4114    /// Columns to run unnest on, can be a list of (List/Struct) columns
4115    pub exec_columns: Vec<Column>,
4116    /// refer to the indices(in the input schema) of columns
4117    /// that have type list to run unnest on
4118    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4119    /// refer to the indices (in the input schema) of columns
4120    /// that have type struct to run unnest on
4121    pub struct_type_columns: Vec<usize>,
4122    /// Having items aligned with the output columns
4123    /// representing which column in the input schema each output column depends on
4124    pub dependency_indices: Vec<usize>,
4125    /// The output schema, containing the unnested field column.
4126    pub schema: DFSchemaRef,
4127    /// Options
4128    pub options: UnnestOptions,
4129}
4130
4131// Manual implementation needed because of `schema` field. Comparison excludes this field.
4132impl PartialOrd for Unnest {
4133    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4134        #[derive(PartialEq, PartialOrd)]
4135        struct ComparableUnnest<'a> {
4136            /// The incoming logical plan
4137            pub input: &'a Arc<LogicalPlan>,
4138            /// Columns to run unnest on, can be a list of (List/Struct) columns
4139            pub exec_columns: &'a Vec<Column>,
4140            /// refer to the indices(in the input schema) of columns
4141            /// that have type list to run unnest on
4142            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4143            /// refer to the indices (in the input schema) of columns
4144            /// that have type struct to run unnest on
4145            pub struct_type_columns: &'a Vec<usize>,
4146            /// Having items aligned with the output columns
4147            /// representing which column in the input schema each output column depends on
4148            pub dependency_indices: &'a Vec<usize>,
4149            /// Options
4150            pub options: &'a UnnestOptions,
4151        }
4152        let comparable_self = ComparableUnnest {
4153            input: &self.input,
4154            exec_columns: &self.exec_columns,
4155            list_type_columns: &self.list_type_columns,
4156            struct_type_columns: &self.struct_type_columns,
4157            dependency_indices: &self.dependency_indices,
4158            options: &self.options,
4159        };
4160        let comparable_other = ComparableUnnest {
4161            input: &other.input,
4162            exec_columns: &other.exec_columns,
4163            list_type_columns: &other.list_type_columns,
4164            struct_type_columns: &other.struct_type_columns,
4165            dependency_indices: &other.dependency_indices,
4166            options: &other.options,
4167        };
4168        comparable_self
4169            .partial_cmp(&comparable_other)
4170            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4171            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4172    }
4173}
4174
4175impl Unnest {
4176    pub fn try_new(
4177        input: Arc<LogicalPlan>,
4178        exec_columns: Vec<Column>,
4179        options: UnnestOptions,
4180    ) -> Result<Self> {
4181        if exec_columns.is_empty() {
4182            return plan_err!("unnest plan requires at least 1 column to unnest");
4183        }
4184
4185        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4186        let mut struct_columns = vec![];
4187        let indices_to_unnest = exec_columns
4188            .iter()
4189            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4190            .collect::<Result<HashMap<usize, &Column>>>()?;
4191
4192        let input_schema = input.schema();
4193
4194        let mut dependency_indices = vec![];
4195        // Transform input schema into new schema
4196        // Given this comprehensive example
4197        //
4198        // input schema:
4199        // 1.col1_unnest_placeholder: list[list[int]],
4200        // 2.col1: list[list[int]]
4201        // 3.col2: list[int]
4202        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4203        // output schema:
4204        // 1.unnest_col1_depth_2: int
4205        // 2.unnest_col1_depth_1: list[int]
4206        // 3.col1: list[list[int]]
4207        // 4.unnest_col2_depth_1: int
4208        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4209        // the plural.
4210        let fields = input_schema
4211            .iter()
4212            .enumerate()
4213            .map(|(index, (original_qualifier, original_field))| {
4214                match indices_to_unnest.get(&index) {
4215                    Some(column_to_unnest) => {
4216                        let recursions_on_column = options
4217                            .recursions
4218                            .iter()
4219                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4220                            .collect::<Vec<_>>();
4221                        let mut transformed_columns = recursions_on_column
4222                            .iter()
4223                            .map(|r| {
4224                                list_columns.push((
4225                                    index,
4226                                    ColumnUnnestList {
4227                                        output_column: r.output_column.clone(),
4228                                        depth: r.depth,
4229                                    },
4230                                ));
4231                                Ok(get_unnested_columns(
4232                                    &r.output_column.name,
4233                                    original_field.data_type(),
4234                                    r.depth,
4235                                )?
4236                                .into_iter()
4237                                .next()
4238                                .unwrap()) // because unnesting a list column always result into one result
4239                            })
4240                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4241                        if transformed_columns.is_empty() {
4242                            transformed_columns = get_unnested_columns(
4243                                &column_to_unnest.name,
4244                                original_field.data_type(),
4245                                1,
4246                            )?;
4247                            match original_field.data_type() {
4248                                DataType::Struct(_) => {
4249                                    struct_columns.push(index);
4250                                }
4251                                DataType::List(_)
4252                                | DataType::FixedSizeList(_, _)
4253                                | DataType::LargeList(_) => {
4254                                    list_columns.push((
4255                                        index,
4256                                        ColumnUnnestList {
4257                                            output_column: Column::from_name(
4258                                                &column_to_unnest.name,
4259                                            ),
4260                                            depth: 1,
4261                                        },
4262                                    ));
4263                                }
4264                                _ => {}
4265                            };
4266                        }
4267
4268                        // new columns dependent on the same original index
4269                        dependency_indices.extend(std::iter::repeat_n(
4270                            index,
4271                            transformed_columns.len(),
4272                        ));
4273                        Ok(transformed_columns
4274                            .iter()
4275                            .map(|(col, field)| {
4276                                (col.relation.to_owned(), field.to_owned())
4277                            })
4278                            .collect())
4279                    }
4280                    None => {
4281                        dependency_indices.push(index);
4282                        Ok(vec![(
4283                            original_qualifier.cloned(),
4284                            Arc::clone(original_field),
4285                        )])
4286                    }
4287                }
4288            })
4289            .collect::<Result<Vec<_>>>()?
4290            .into_iter()
4291            .flatten()
4292            .collect::<Vec<_>>();
4293
4294        let metadata = input_schema.metadata().clone();
4295        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4296        // We can use the existing functional dependencies:
4297        let deps = input_schema.functional_dependencies().clone();
4298        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4299
4300        Ok(Unnest {
4301            input,
4302            exec_columns,
4303            list_type_columns: list_columns,
4304            struct_type_columns: struct_columns,
4305            dependency_indices,
4306            schema,
4307            options,
4308        })
4309    }
4310}
4311
4312// Based on data type, either struct or a variant of list
4313// return a set of columns as the result of unnesting
4314// the input columns.
4315// For example, given a column with name "a",
4316// - List(Element) returns ["a"] with data type Element
4317// - Struct(field1, field2) returns ["a.field1","a.field2"]
4318// For list data type, an argument depth is used to specify
4319// the recursion level
4320fn get_unnested_columns(
4321    col_name: &String,
4322    data_type: &DataType,
4323    depth: usize,
4324) -> Result<Vec<(Column, Arc<Field>)>> {
4325    let mut qualified_columns = Vec::with_capacity(1);
4326
4327    match data_type {
4328        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4329            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4330            let new_field = Arc::new(Field::new(
4331                col_name, data_type,
4332                // Unnesting may produce NULLs even if the list is not null.
4333                // For example: unnest([1], []) -> 1, null
4334                true,
4335            ));
4336            let column = Column::from_name(col_name);
4337            // let column = Column::from((None, &new_field));
4338            qualified_columns.push((column, new_field));
4339        }
4340        DataType::Struct(fields) => {
4341            qualified_columns.extend(fields.iter().map(|f| {
4342                let new_name = format!("{}.{}", col_name, f.name());
4343                let column = Column::from_name(&new_name);
4344                let new_field = f.as_ref().clone().with_name(new_name);
4345                // let column = Column::from((None, &f));
4346                (column, Arc::new(new_field))
4347            }))
4348        }
4349        _ => {
4350            return internal_err!("trying to unnest on invalid data type {data_type}");
4351        }
4352    };
4353    Ok(qualified_columns)
4354}
4355
4356// Get the data type of a multi-dimensional type after unnesting it
4357// with a given depth
4358fn get_unnested_list_datatype_recursive(
4359    data_type: &DataType,
4360    depth: usize,
4361) -> Result<DataType> {
4362    match data_type {
4363        DataType::List(field)
4364        | DataType::FixedSizeList(field, _)
4365        | DataType::LargeList(field) => {
4366            if depth == 1 {
4367                return Ok(field.data_type().clone());
4368            }
4369            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4370        }
4371        _ => {}
4372    };
4373
4374    internal_err!("trying to unnest on invalid data type {data_type}")
4375}
4376
4377#[cfg(test)]
4378mod tests {
4379    use super::*;
4380    use crate::builder::LogicalTableSource;
4381    use crate::logical_plan::table_scan;
4382    use crate::select_expr::SelectExpr;
4383    use crate::test::function_stub::{count, count_udaf};
4384    use crate::{
4385        GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4386        scalar_subquery,
4387    };
4388    use datafusion_common::metadata::ScalarAndMetadata;
4389    use datafusion_common::tree_node::{
4390        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4391    };
4392    use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4393    use insta::{assert_debug_snapshot, assert_snapshot};
4394    use std::hash::DefaultHasher;
4395
4396    fn employee_schema() -> Schema {
4397        Schema::new(vec![
4398            Field::new("id", DataType::Int32, false),
4399            Field::new("first_name", DataType::Utf8, false),
4400            Field::new("last_name", DataType::Utf8, false),
4401            Field::new("state", DataType::Utf8, false),
4402            Field::new("salary", DataType::Int32, false),
4403        ])
4404    }
4405
4406    fn display_plan() -> Result<LogicalPlan> {
4407        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4408            .build()?;
4409
4410        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4411            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4412            .project(vec![col("id")])?
4413            .build()
4414    }
4415
4416    #[test]
4417    fn test_display_indent() -> Result<()> {
4418        let plan = display_plan()?;
4419
4420        assert_snapshot!(plan.display_indent(), @r"
4421        Projection: employee_csv.id
4422          Filter: employee_csv.state IN (<subquery>)
4423            Subquery:
4424              TableScan: employee_csv projection=[state]
4425            TableScan: employee_csv projection=[id, state]
4426        ");
4427        Ok(())
4428    }
4429
4430    #[test]
4431    fn test_display_indent_schema() -> Result<()> {
4432        let plan = display_plan()?;
4433
4434        assert_snapshot!(plan.display_indent_schema(), @r"
4435        Projection: employee_csv.id [id:Int32]
4436          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4437            Subquery: [state:Utf8]
4438              TableScan: employee_csv projection=[state] [state:Utf8]
4439            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4440        ");
4441        Ok(())
4442    }
4443
4444    #[test]
4445    fn test_display_subquery_alias() -> Result<()> {
4446        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4447            .build()?;
4448        let plan1 = Arc::new(plan1);
4449
4450        let plan =
4451            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4452                .project(vec![col("id"), exists(plan1).alias("exists")])?
4453                .build();
4454
4455        assert_snapshot!(plan?.display_indent(), @r"
4456        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4457          Subquery:
4458            TableScan: employee_csv projection=[state]
4459          TableScan: employee_csv projection=[id, state]
4460        ");
4461        Ok(())
4462    }
4463
4464    #[test]
4465    fn test_display_graphviz() -> Result<()> {
4466        let plan = display_plan()?;
4467
4468        // just test for a few key lines in the output rather than the
4469        // whole thing to make test maintenance easier.
4470        assert_snapshot!(plan.display_graphviz(), @r#"
4471        // Begin DataFusion GraphViz Plan,
4472        // display it online here: https://dreampuf.github.io/GraphvizOnline
4473
4474        digraph {
4475          subgraph cluster_1
4476          {
4477            graph[label="LogicalPlan"]
4478            2[shape=box label="Projection: employee_csv.id"]
4479            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4480            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4481            4[shape=box label="Subquery:"]
4482            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4483            5[shape=box label="TableScan: employee_csv projection=[state]"]
4484            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4485            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4486            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4487          }
4488          subgraph cluster_7
4489          {
4490            graph[label="Detailed LogicalPlan"]
4491            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4492            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4493            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4494            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4495            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4496            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4497            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4498            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4499            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4500          }
4501        }
4502        // End DataFusion GraphViz Plan
4503        "#);
4504        Ok(())
4505    }
4506
4507    #[test]
4508    fn test_display_pg_json() -> Result<()> {
4509        let plan = display_plan()?;
4510
4511        assert_snapshot!(plan.display_pg_json(), @r#"
4512        [
4513          {
4514            "Plan": {
4515              "Expressions": [
4516                "employee_csv.id"
4517              ],
4518              "Node Type": "Projection",
4519              "Output": [
4520                "id"
4521              ],
4522              "Plans": [
4523                {
4524                  "Condition": "employee_csv.state IN (<subquery>)",
4525                  "Node Type": "Filter",
4526                  "Output": [
4527                    "id",
4528                    "state"
4529                  ],
4530                  "Plans": [
4531                    {
4532                      "Node Type": "Subquery",
4533                      "Output": [
4534                        "state"
4535                      ],
4536                      "Plans": [
4537                        {
4538                          "Node Type": "TableScan",
4539                          "Output": [
4540                            "state"
4541                          ],
4542                          "Plans": [],
4543                          "Relation Name": "employee_csv"
4544                        }
4545                      ]
4546                    },
4547                    {
4548                      "Node Type": "TableScan",
4549                      "Output": [
4550                        "id",
4551                        "state"
4552                      ],
4553                      "Plans": [],
4554                      "Relation Name": "employee_csv"
4555                    }
4556                  ]
4557                }
4558              ]
4559            }
4560          }
4561        ]
4562        "#);
4563        Ok(())
4564    }
4565
4566    /// Tests for the Visitor trait and walking logical plan nodes
4567    #[derive(Debug, Default)]
4568    struct OkVisitor {
4569        strings: Vec<String>,
4570    }
4571
4572    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4573        type Node = LogicalPlan;
4574
4575        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4576            let s = match plan {
4577                LogicalPlan::Projection { .. } => "pre_visit Projection",
4578                LogicalPlan::Filter { .. } => "pre_visit Filter",
4579                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4580                _ => {
4581                    return not_impl_err!("unknown plan type");
4582                }
4583            };
4584
4585            self.strings.push(s.into());
4586            Ok(TreeNodeRecursion::Continue)
4587        }
4588
4589        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4590            let s = match plan {
4591                LogicalPlan::Projection { .. } => "post_visit Projection",
4592                LogicalPlan::Filter { .. } => "post_visit Filter",
4593                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4594                _ => {
4595                    return not_impl_err!("unknown plan type");
4596                }
4597            };
4598
4599            self.strings.push(s.into());
4600            Ok(TreeNodeRecursion::Continue)
4601        }
4602    }
4603
4604    #[test]
4605    fn visit_order() {
4606        let mut visitor = OkVisitor::default();
4607        let plan = test_plan();
4608        let res = plan.visit_with_subqueries(&mut visitor);
4609        assert!(res.is_ok());
4610
4611        assert_debug_snapshot!(visitor.strings, @r#"
4612        [
4613            "pre_visit Projection",
4614            "pre_visit Filter",
4615            "pre_visit TableScan",
4616            "post_visit TableScan",
4617            "post_visit Filter",
4618            "post_visit Projection",
4619        ]
4620        "#);
4621    }
4622
4623    #[derive(Debug, Default)]
4624    /// Counter than counts to zero and returns true when it gets there
4625    struct OptionalCounter {
4626        val: Option<usize>,
4627    }
4628
4629    impl OptionalCounter {
4630        fn new(val: usize) -> Self {
4631            Self { val: Some(val) }
4632        }
4633        // Decrements the counter by 1, if any, returning true if it hits zero
4634        fn dec(&mut self) -> bool {
4635            if Some(0) == self.val {
4636                true
4637            } else {
4638                self.val = self.val.take().map(|i| i - 1);
4639                false
4640            }
4641        }
4642    }
4643
4644    #[derive(Debug, Default)]
4645    /// Visitor that returns false after some number of visits
4646    struct StoppingVisitor {
4647        inner: OkVisitor,
4648        /// When Some(0) returns false from pre_visit
4649        return_false_from_pre_in: OptionalCounter,
4650        /// When Some(0) returns false from post_visit
4651        return_false_from_post_in: OptionalCounter,
4652    }
4653
4654    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4655        type Node = LogicalPlan;
4656
4657        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4658            if self.return_false_from_pre_in.dec() {
4659                return Ok(TreeNodeRecursion::Stop);
4660            }
4661            self.inner.f_down(plan)?;
4662
4663            Ok(TreeNodeRecursion::Continue)
4664        }
4665
4666        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4667            if self.return_false_from_post_in.dec() {
4668                return Ok(TreeNodeRecursion::Stop);
4669            }
4670
4671            self.inner.f_up(plan)
4672        }
4673    }
4674
4675    /// test early stopping in pre-visit
4676    #[test]
4677    fn early_stopping_pre_visit() {
4678        let mut visitor = StoppingVisitor {
4679            return_false_from_pre_in: OptionalCounter::new(2),
4680            ..Default::default()
4681        };
4682        let plan = test_plan();
4683        let res = plan.visit_with_subqueries(&mut visitor);
4684        assert!(res.is_ok());
4685
4686        assert_debug_snapshot!(
4687            visitor.inner.strings,
4688            @r#"
4689        [
4690            "pre_visit Projection",
4691            "pre_visit Filter",
4692        ]
4693        "#
4694        );
4695    }
4696
4697    #[test]
4698    fn early_stopping_post_visit() {
4699        let mut visitor = StoppingVisitor {
4700            return_false_from_post_in: OptionalCounter::new(1),
4701            ..Default::default()
4702        };
4703        let plan = test_plan();
4704        let res = plan.visit_with_subqueries(&mut visitor);
4705        assert!(res.is_ok());
4706
4707        assert_debug_snapshot!(
4708            visitor.inner.strings,
4709            @r#"
4710        [
4711            "pre_visit Projection",
4712            "pre_visit Filter",
4713            "pre_visit TableScan",
4714            "post_visit TableScan",
4715        ]
4716        "#
4717        );
4718    }
4719
4720    #[derive(Debug, Default)]
4721    /// Visitor that returns an error after some number of visits
4722    struct ErrorVisitor {
4723        inner: OkVisitor,
4724        /// When Some(0) returns false from pre_visit
4725        return_error_from_pre_in: OptionalCounter,
4726        /// When Some(0) returns false from post_visit
4727        return_error_from_post_in: OptionalCounter,
4728    }
4729
4730    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4731        type Node = LogicalPlan;
4732
4733        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4734            if self.return_error_from_pre_in.dec() {
4735                return not_impl_err!("Error in pre_visit");
4736            }
4737
4738            self.inner.f_down(plan)
4739        }
4740
4741        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4742            if self.return_error_from_post_in.dec() {
4743                return not_impl_err!("Error in post_visit");
4744            }
4745
4746            self.inner.f_up(plan)
4747        }
4748    }
4749
4750    #[test]
4751    fn error_pre_visit() {
4752        let mut visitor = ErrorVisitor {
4753            return_error_from_pre_in: OptionalCounter::new(2),
4754            ..Default::default()
4755        };
4756        let plan = test_plan();
4757        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4758        assert_snapshot!(
4759            res.strip_backtrace(),
4760            @"This feature is not implemented: Error in pre_visit"
4761        );
4762        assert_debug_snapshot!(
4763            visitor.inner.strings,
4764            @r#"
4765        [
4766            "pre_visit Projection",
4767            "pre_visit Filter",
4768        ]
4769        "#
4770        );
4771    }
4772
4773    #[test]
4774    fn error_post_visit() {
4775        let mut visitor = ErrorVisitor {
4776            return_error_from_post_in: OptionalCounter::new(1),
4777            ..Default::default()
4778        };
4779        let plan = test_plan();
4780        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4781        assert_snapshot!(
4782            res.strip_backtrace(),
4783            @"This feature is not implemented: Error in post_visit"
4784        );
4785        assert_debug_snapshot!(
4786            visitor.inner.strings,
4787            @r#"
4788        [
4789            "pre_visit Projection",
4790            "pre_visit Filter",
4791            "pre_visit TableScan",
4792            "post_visit TableScan",
4793        ]
4794        "#
4795        );
4796    }
4797
4798    #[test]
4799    fn test_partial_eq_hash_and_partial_ord() {
4800        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4801            produce_one_row: true,
4802            schema: Arc::new(DFSchema::empty()),
4803        }));
4804
4805        let count_window_function = |schema| {
4806            Window::try_new_with_schema(
4807                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4808                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4809                    vec![],
4810                )))],
4811                Arc::clone(&empty_values),
4812                Arc::new(schema),
4813            )
4814            .unwrap()
4815        };
4816
4817        let schema_without_metadata = || {
4818            DFSchema::from_unqualified_fields(
4819                vec![Field::new("count", DataType::Int64, false)].into(),
4820                HashMap::new(),
4821            )
4822            .unwrap()
4823        };
4824
4825        let schema_with_metadata = || {
4826            DFSchema::from_unqualified_fields(
4827                vec![Field::new("count", DataType::Int64, false)].into(),
4828                [("key".to_string(), "value".to_string())].into(),
4829            )
4830            .unwrap()
4831        };
4832
4833        // A Window
4834        let f = count_window_function(schema_without_metadata());
4835
4836        // Same like `f`, different instance
4837        let f2 = count_window_function(schema_without_metadata());
4838        assert_eq!(f, f2);
4839        assert_eq!(hash(&f), hash(&f2));
4840        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4841
4842        // Same like `f`, except for schema metadata
4843        let o = count_window_function(schema_with_metadata());
4844        assert_ne!(f, o);
4845        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4846        assert_eq!(f.partial_cmp(&o), None);
4847    }
4848
4849    fn hash<T: Hash>(value: &T) -> u64 {
4850        let hasher = &mut DefaultHasher::new();
4851        value.hash(hasher);
4852        hasher.finish()
4853    }
4854
4855    #[test]
4856    fn projection_expr_schema_mismatch() -> Result<()> {
4857        let empty_schema = Arc::new(DFSchema::empty());
4858        let p = Projection::try_new_with_schema(
4859            vec![col("a")],
4860            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4861                produce_one_row: false,
4862                schema: Arc::clone(&empty_schema),
4863            })),
4864            empty_schema,
4865        );
4866        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4867        Ok(())
4868    }
4869
4870    fn test_plan() -> LogicalPlan {
4871        let schema = Schema::new(vec![
4872            Field::new("id", DataType::Int32, false),
4873            Field::new("state", DataType::Utf8, false),
4874        ]);
4875
4876        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4877            .unwrap()
4878            .filter(col("state").eq(lit("CO")))
4879            .unwrap()
4880            .project(vec![col("id")])
4881            .unwrap()
4882            .build()
4883            .unwrap()
4884    }
4885
4886    #[test]
4887    fn test_replace_invalid_placeholder() {
4888        // test empty placeholder
4889        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4890
4891        let plan = table_scan(TableReference::none(), &schema, None)
4892            .unwrap()
4893            .filter(col("id").eq(placeholder("")))
4894            .unwrap()
4895            .build()
4896            .unwrap();
4897
4898        let param_values = vec![ScalarValue::Int32(Some(42))];
4899        plan.replace_params_with_values(&param_values.clone().into())
4900            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4901
4902        // test $0 placeholder
4903        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4904
4905        let plan = table_scan(TableReference::none(), &schema, None)
4906            .unwrap()
4907            .filter(col("id").eq(placeholder("$0")))
4908            .unwrap()
4909            .build()
4910            .unwrap();
4911
4912        plan.replace_params_with_values(&param_values.clone().into())
4913            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4914
4915        // test $00 placeholder
4916        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4917
4918        let plan = table_scan(TableReference::none(), &schema, None)
4919            .unwrap()
4920            .filter(col("id").eq(placeholder("$00")))
4921            .unwrap()
4922            .build()
4923            .unwrap();
4924
4925        plan.replace_params_with_values(&param_values.into())
4926            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4927    }
4928
4929    #[test]
4930    fn test_replace_placeholder_mismatched_metadata() {
4931        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4932
4933        // Create a prepared statement with explicit fields that do not have metadata
4934        let plan = table_scan(TableReference::none(), &schema, None)
4935            .unwrap()
4936            .filter(col("id").eq(placeholder("$1")))
4937            .unwrap()
4938            .build()
4939            .unwrap();
4940        let prepared_builder = LogicalPlanBuilder::new(plan)
4941            .prepare(
4942                "".to_string(),
4943                vec![Field::new("", DataType::Int32, true).into()],
4944            )
4945            .unwrap();
4946
4947        // Attempt to bind a parameter with metadata
4948        let mut scalar_meta = HashMap::new();
4949        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4950        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4951            ScalarValue::Int32(Some(42)),
4952            Some(scalar_meta.into()),
4953        )]);
4954        prepared_builder
4955            .plan()
4956            .clone()
4957            .with_param_values(param_values)
4958            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4959    }
4960
4961    #[test]
4962    fn test_replace_placeholder_empty_relation_valid_schema() {
4963        // SELECT $1, $2;
4964        let plan = LogicalPlanBuilder::empty(false)
4965            .project(vec![
4966                SelectExpr::from(placeholder("$1")),
4967                SelectExpr::from(placeholder("$2")),
4968            ])
4969            .unwrap()
4970            .build()
4971            .unwrap();
4972
4973        // original
4974        assert_snapshot!(plan.display_indent_schema(), @r"
4975        Projection: $1, $2 [$1:Null;N, $2:Null;N]
4976          EmptyRelation: rows=0 []
4977        ");
4978
4979        let plan = plan
4980            .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4981            .unwrap();
4982
4983        // replaced
4984        assert_snapshot!(plan.display_indent_schema(), @r#"
4985        Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4986          EmptyRelation: rows=0 []
4987        "#);
4988    }
4989
4990    #[test]
4991    fn test_nullable_schema_after_grouping_set() {
4992        let schema = Schema::new(vec![
4993            Field::new("foo", DataType::Int32, false),
4994            Field::new("bar", DataType::Int32, false),
4995        ]);
4996
4997        let plan = table_scan(TableReference::none(), &schema, None)
4998            .unwrap()
4999            .aggregate(
5000                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5001                    vec![col("foo")],
5002                    vec![col("bar")],
5003                ]))],
5004                vec![count(lit(true))],
5005            )
5006            .unwrap()
5007            .build()
5008            .unwrap();
5009
5010        let output_schema = plan.schema();
5011
5012        assert!(
5013            output_schema
5014                .field_with_name(None, "foo")
5015                .unwrap()
5016                .is_nullable(),
5017        );
5018        assert!(
5019            output_schema
5020                .field_with_name(None, "bar")
5021                .unwrap()
5022                .is_nullable()
5023        );
5024    }
5025
5026    #[test]
5027    fn test_filter_is_scalar() {
5028        // test empty placeholder
5029        let schema =
5030            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5031
5032        let source = Arc::new(LogicalTableSource::new(schema));
5033        let schema = Arc::new(
5034            DFSchema::try_from_qualified_schema(
5035                TableReference::bare("tab"),
5036                &source.schema(),
5037            )
5038            .unwrap(),
5039        );
5040        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5041            table_name: TableReference::bare("tab"),
5042            source: Arc::clone(&source) as Arc<dyn TableSource>,
5043            projection: None,
5044            projected_schema: Arc::clone(&schema),
5045            filters: vec![],
5046            fetch: None,
5047        }));
5048        let col = schema.field_names()[0].clone();
5049
5050        let filter = Filter::try_new(
5051            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5052            scan,
5053        )
5054        .unwrap();
5055        assert!(!filter.is_scalar());
5056        let unique_schema = Arc::new(
5057            schema
5058                .as_ref()
5059                .clone()
5060                .with_functional_dependencies(
5061                    FunctionalDependencies::new_from_constraints(
5062                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
5063                            vec![0],
5064                        )])),
5065                        1,
5066                    ),
5067                )
5068                .unwrap(),
5069        );
5070        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5071            table_name: TableReference::bare("tab"),
5072            source,
5073            projection: None,
5074            projected_schema: Arc::clone(&unique_schema),
5075            filters: vec![],
5076            fetch: None,
5077        }));
5078        let col = schema.field_names()[0].clone();
5079
5080        let filter =
5081            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5082        assert!(filter.is_scalar());
5083    }
5084
5085    #[test]
5086    fn test_transform_explain() {
5087        let schema = Schema::new(vec![
5088            Field::new("foo", DataType::Int32, false),
5089            Field::new("bar", DataType::Int32, false),
5090        ]);
5091
5092        let plan = table_scan(TableReference::none(), &schema, None)
5093            .unwrap()
5094            .explain(false, false)
5095            .unwrap()
5096            .build()
5097            .unwrap();
5098
5099        let external_filter = col("foo").eq(lit(true));
5100
5101        // after transformation, because plan is not the same anymore,
5102        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
5103        let plan = plan
5104            .transform(|plan| match plan {
5105                LogicalPlan::TableScan(table) => {
5106                    let filter = Filter::try_new(
5107                        external_filter.clone(),
5108                        Arc::new(LogicalPlan::TableScan(table)),
5109                    )
5110                    .unwrap();
5111                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5112                }
5113                x => Ok(Transformed::no(x)),
5114            })
5115            .data()
5116            .unwrap();
5117
5118        let actual = format!("{}", plan.display_indent());
5119        assert_snapshot!(actual, @r"
5120        Explain
5121          Filter: foo = Boolean(true)
5122            TableScan: ?table?
5123        ")
5124    }
5125
5126    #[test]
5127    fn test_plan_partial_ord() {
5128        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5129            produce_one_row: false,
5130            schema: Arc::new(DFSchema::empty()),
5131        });
5132
5133        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5134            schema: Arc::new(Schema::new(vec![Field::new(
5135                "foo",
5136                DataType::Int32,
5137                false,
5138            )])),
5139            output_schema: DFSchemaRef::new(DFSchema::empty()),
5140        });
5141
5142        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5143            schema: Arc::new(Schema::new(vec![Field::new(
5144                "foo",
5145                DataType::Int32,
5146                false,
5147            )])),
5148            output_schema: DFSchemaRef::new(DFSchema::empty()),
5149        });
5150
5151        assert_eq!(
5152            empty_relation.partial_cmp(&describe_table),
5153            Some(Ordering::Less)
5154        );
5155        assert_eq!(
5156            describe_table.partial_cmp(&empty_relation),
5157            Some(Ordering::Greater)
5158        );
5159        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5160    }
5161
5162    #[test]
5163    fn test_limit_with_new_children() {
5164        let input = Arc::new(LogicalPlan::Values(Values {
5165            schema: Arc::new(DFSchema::empty()),
5166            values: vec![vec![]],
5167        }));
5168        let cases = [
5169            LogicalPlan::Limit(Limit {
5170                skip: None,
5171                fetch: None,
5172                input: Arc::clone(&input),
5173            }),
5174            LogicalPlan::Limit(Limit {
5175                skip: None,
5176                fetch: Some(Box::new(Expr::Literal(
5177                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5178                    None,
5179                ))),
5180                input: Arc::clone(&input),
5181            }),
5182            LogicalPlan::Limit(Limit {
5183                skip: Some(Box::new(Expr::Literal(
5184                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5185                    None,
5186                ))),
5187                fetch: None,
5188                input: Arc::clone(&input),
5189            }),
5190            LogicalPlan::Limit(Limit {
5191                skip: Some(Box::new(Expr::Literal(
5192                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5193                    None,
5194                ))),
5195                fetch: Some(Box::new(Expr::Literal(
5196                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5197                    None,
5198                ))),
5199                input,
5200            }),
5201        ];
5202
5203        for limit in cases {
5204            let new_limit = limit
5205                .with_new_exprs(
5206                    limit.expressions(),
5207                    limit.inputs().into_iter().cloned().collect(),
5208                )
5209                .unwrap();
5210            assert_eq!(limit, new_limit);
5211        }
5212    }
5213
5214    #[test]
5215    fn test_with_subqueries_jump() {
5216        // The test plan contains a `Project` node above a `Filter` node, and the
5217        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5218        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5219        // `Filter`s.
5220        let subquery_schema =
5221            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5222
5223        let subquery_plan =
5224            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5225                .unwrap()
5226                .filter(col("sub_id").eq(lit(0)))
5227                .unwrap()
5228                .build()
5229                .unwrap();
5230
5231        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5232
5233        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5234            .unwrap()
5235            .filter(col("id").eq(lit(0)))
5236            .unwrap()
5237            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5238            .unwrap()
5239            .build()
5240            .unwrap();
5241
5242        let mut filter_found = false;
5243        plan.apply_with_subqueries(|plan| {
5244            match plan {
5245                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5246                LogicalPlan::Filter(..) => filter_found = true,
5247                _ => {}
5248            }
5249            Ok(TreeNodeRecursion::Continue)
5250        })
5251        .unwrap();
5252        assert!(!filter_found);
5253
5254        struct ProjectJumpVisitor {
5255            filter_found: bool,
5256        }
5257
5258        impl ProjectJumpVisitor {
5259            fn new() -> Self {
5260                Self {
5261                    filter_found: false,
5262                }
5263            }
5264        }
5265
5266        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5267            type Node = LogicalPlan;
5268
5269            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5270                match node {
5271                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5272                    LogicalPlan::Filter(..) => self.filter_found = true,
5273                    _ => {}
5274                }
5275                Ok(TreeNodeRecursion::Continue)
5276            }
5277        }
5278
5279        let mut visitor = ProjectJumpVisitor::new();
5280        plan.visit_with_subqueries(&mut visitor).unwrap();
5281        assert!(!visitor.filter_found);
5282
5283        let mut filter_found = false;
5284        plan.clone()
5285            .transform_down_with_subqueries(|plan| {
5286                match plan {
5287                    LogicalPlan::Projection(..) => {
5288                        return Ok(Transformed::new(
5289                            plan,
5290                            false,
5291                            TreeNodeRecursion::Jump,
5292                        ));
5293                    }
5294                    LogicalPlan::Filter(..) => filter_found = true,
5295                    _ => {}
5296                }
5297                Ok(Transformed::no(plan))
5298            })
5299            .unwrap();
5300        assert!(!filter_found);
5301
5302        let mut filter_found = false;
5303        plan.clone()
5304            .transform_down_up_with_subqueries(
5305                |plan| {
5306                    match plan {
5307                        LogicalPlan::Projection(..) => {
5308                            return Ok(Transformed::new(
5309                                plan,
5310                                false,
5311                                TreeNodeRecursion::Jump,
5312                            ));
5313                        }
5314                        LogicalPlan::Filter(..) => filter_found = true,
5315                        _ => {}
5316                    }
5317                    Ok(Transformed::no(plan))
5318                },
5319                |plan| Ok(Transformed::no(plan)),
5320            )
5321            .unwrap();
5322        assert!(!filter_found);
5323
5324        struct ProjectJumpRewriter {
5325            filter_found: bool,
5326        }
5327
5328        impl ProjectJumpRewriter {
5329            fn new() -> Self {
5330                Self {
5331                    filter_found: false,
5332                }
5333            }
5334        }
5335
5336        impl TreeNodeRewriter for ProjectJumpRewriter {
5337            type Node = LogicalPlan;
5338
5339            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5340                match node {
5341                    LogicalPlan::Projection(..) => {
5342                        return Ok(Transformed::new(
5343                            node,
5344                            false,
5345                            TreeNodeRecursion::Jump,
5346                        ));
5347                    }
5348                    LogicalPlan::Filter(..) => self.filter_found = true,
5349                    _ => {}
5350                }
5351                Ok(Transformed::no(node))
5352            }
5353        }
5354
5355        let mut rewriter = ProjectJumpRewriter::new();
5356        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5357        assert!(!rewriter.filter_found);
5358    }
5359
5360    #[test]
5361    fn test_with_unresolved_placeholders() {
5362        let field_name = "id";
5363        let placeholder_value = "$1";
5364        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5365
5366        let plan = table_scan(TableReference::none(), &schema, None)
5367            .unwrap()
5368            .filter(col(field_name).eq(placeholder(placeholder_value)))
5369            .unwrap()
5370            .build()
5371            .unwrap();
5372
5373        // Check that the placeholder parameters have not received a DataType.
5374        let params = plan.get_parameter_fields().unwrap();
5375        assert_eq!(params.len(), 1);
5376
5377        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5378        assert_eq!(parameter_type, None);
5379    }
5380
5381    #[test]
5382    fn test_join_with_new_exprs() -> Result<()> {
5383        fn create_test_join(
5384            on: Vec<(Expr, Expr)>,
5385            filter: Option<Expr>,
5386        ) -> Result<LogicalPlan> {
5387            let schema = Schema::new(vec![
5388                Field::new("a", DataType::Int32, false),
5389                Field::new("b", DataType::Int32, false),
5390            ]);
5391
5392            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5393            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5394
5395            Ok(LogicalPlan::Join(Join {
5396                left: Arc::new(
5397                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5398                ),
5399                right: Arc::new(
5400                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5401                ),
5402                on,
5403                filter,
5404                join_type: JoinType::Inner,
5405                join_constraint: JoinConstraint::On,
5406                schema: Arc::new(left_schema.join(&right_schema)?),
5407                null_equality: NullEquality::NullEqualsNothing,
5408            }))
5409        }
5410
5411        {
5412            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5413            let LogicalPlan::Join(join) = join.with_new_exprs(
5414                join.expressions(),
5415                join.inputs().into_iter().cloned().collect(),
5416            )?
5417            else {
5418                unreachable!()
5419            };
5420            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5421            assert_eq!(join.filter, None);
5422        }
5423
5424        {
5425            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5426            let LogicalPlan::Join(join) = join.with_new_exprs(
5427                join.expressions(),
5428                join.inputs().into_iter().cloned().collect(),
5429            )?
5430            else {
5431                unreachable!()
5432            };
5433            assert_eq!(join.on, vec![]);
5434            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5435        }
5436
5437        {
5438            let join = create_test_join(
5439                vec![(col("t1.a"), (col("t2.a")))],
5440                Some(col("t1.b").gt(col("t2.b"))),
5441            )?;
5442            let LogicalPlan::Join(join) = join.with_new_exprs(
5443                join.expressions(),
5444                join.inputs().into_iter().cloned().collect(),
5445            )?
5446            else {
5447                unreachable!()
5448            };
5449            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5450            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5451        }
5452
5453        {
5454            let join = create_test_join(
5455                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5456                None,
5457            )?;
5458            let LogicalPlan::Join(join) = join.with_new_exprs(
5459                vec![
5460                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5461                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5462                    col("t1.b"),
5463                    col("t2.b"),
5464                    lit(true),
5465                ],
5466                join.inputs().into_iter().cloned().collect(),
5467            )?
5468            else {
5469                unreachable!()
5470            };
5471            assert_eq!(
5472                join.on,
5473                vec![
5474                    (
5475                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5476                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5477                    ),
5478                    (col("t1.b"), (col("t2.b")))
5479                ]
5480            );
5481            assert_eq!(join.filter, Some(lit(true)));
5482        }
5483
5484        Ok(())
5485    }
5486
5487    #[test]
5488    fn test_join_try_new() -> Result<()> {
5489        let schema = Schema::new(vec![
5490            Field::new("a", DataType::Int32, false),
5491            Field::new("b", DataType::Int32, false),
5492        ]);
5493
5494        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5495
5496        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5497
5498        let join_types = vec![
5499            JoinType::Inner,
5500            JoinType::Left,
5501            JoinType::Right,
5502            JoinType::Full,
5503            JoinType::LeftSemi,
5504            JoinType::LeftAnti,
5505            JoinType::RightSemi,
5506            JoinType::RightAnti,
5507            JoinType::LeftMark,
5508        ];
5509
5510        for join_type in join_types {
5511            let join = Join::try_new(
5512                Arc::new(left_scan.clone()),
5513                Arc::new(right_scan.clone()),
5514                vec![(col("t1.a"), col("t2.a"))],
5515                Some(col("t1.b").gt(col("t2.b"))),
5516                join_type,
5517                JoinConstraint::On,
5518                NullEquality::NullEqualsNothing,
5519            )?;
5520
5521            match join_type {
5522                JoinType::LeftSemi | JoinType::LeftAnti => {
5523                    assert_eq!(join.schema.fields().len(), 2);
5524
5525                    let fields = join.schema.fields();
5526                    assert_eq!(
5527                        fields[0].name(),
5528                        "a",
5529                        "First field should be 'a' from left table"
5530                    );
5531                    assert_eq!(
5532                        fields[1].name(),
5533                        "b",
5534                        "Second field should be 'b' from left table"
5535                    );
5536                }
5537                JoinType::RightSemi | JoinType::RightAnti => {
5538                    assert_eq!(join.schema.fields().len(), 2);
5539
5540                    let fields = join.schema.fields();
5541                    assert_eq!(
5542                        fields[0].name(),
5543                        "a",
5544                        "First field should be 'a' from right table"
5545                    );
5546                    assert_eq!(
5547                        fields[1].name(),
5548                        "b",
5549                        "Second field should be 'b' from right table"
5550                    );
5551                }
5552                JoinType::LeftMark => {
5553                    assert_eq!(join.schema.fields().len(), 3);
5554
5555                    let fields = join.schema.fields();
5556                    assert_eq!(
5557                        fields[0].name(),
5558                        "a",
5559                        "First field should be 'a' from left table"
5560                    );
5561                    assert_eq!(
5562                        fields[1].name(),
5563                        "b",
5564                        "Second field should be 'b' from left table"
5565                    );
5566                    assert_eq!(
5567                        fields[2].name(),
5568                        "mark",
5569                        "Third field should be the mark column"
5570                    );
5571
5572                    assert!(!fields[0].is_nullable());
5573                    assert!(!fields[1].is_nullable());
5574                    assert!(!fields[2].is_nullable());
5575                }
5576                _ => {
5577                    assert_eq!(join.schema.fields().len(), 4);
5578
5579                    let fields = join.schema.fields();
5580                    assert_eq!(
5581                        fields[0].name(),
5582                        "a",
5583                        "First field should be 'a' from left table"
5584                    );
5585                    assert_eq!(
5586                        fields[1].name(),
5587                        "b",
5588                        "Second field should be 'b' from left table"
5589                    );
5590                    assert_eq!(
5591                        fields[2].name(),
5592                        "a",
5593                        "Third field should be 'a' from right table"
5594                    );
5595                    assert_eq!(
5596                        fields[3].name(),
5597                        "b",
5598                        "Fourth field should be 'b' from right table"
5599                    );
5600
5601                    if join_type == JoinType::Left {
5602                        // Left side fields (first two) shouldn't be nullable
5603                        assert!(!fields[0].is_nullable());
5604                        assert!(!fields[1].is_nullable());
5605                        // Right side fields (third and fourth) should be nullable
5606                        assert!(fields[2].is_nullable());
5607                        assert!(fields[3].is_nullable());
5608                    } else if join_type == JoinType::Right {
5609                        // Left side fields (first two) should be nullable
5610                        assert!(fields[0].is_nullable());
5611                        assert!(fields[1].is_nullable());
5612                        // Right side fields (third and fourth) shouldn't be nullable
5613                        assert!(!fields[2].is_nullable());
5614                        assert!(!fields[3].is_nullable());
5615                    } else if join_type == JoinType::Full {
5616                        assert!(fields[0].is_nullable());
5617                        assert!(fields[1].is_nullable());
5618                        assert!(fields[2].is_nullable());
5619                        assert!(fields[3].is_nullable());
5620                    }
5621                }
5622            }
5623
5624            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5625            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5626            assert_eq!(join.join_type, join_type);
5627            assert_eq!(join.join_constraint, JoinConstraint::On);
5628            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5629        }
5630
5631        Ok(())
5632    }
5633
5634    #[test]
5635    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5636        let left_schema = Schema::new(vec![
5637            Field::new("id", DataType::Int32, false), // Common column in both tables
5638            Field::new("name", DataType::Utf8, false), // Unique to left
5639            Field::new("value", DataType::Int32, false), // Common column, different meaning
5640        ]);
5641
5642        let right_schema = Schema::new(vec![
5643            Field::new("id", DataType::Int32, false), // Common column in both tables
5644            Field::new("category", DataType::Utf8, false), // Unique to right
5645            Field::new("value", DataType::Float64, true), // Common column, different meaning
5646        ]);
5647
5648        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5649
5650        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5651
5652        // Test 1: USING constraint with a common column
5653        {
5654            // In the logical plan, both copies of the `id` column are preserved
5655            // The USING constraint is handled later during physical execution, where the common column appears once
5656            let join = Join::try_new(
5657                Arc::new(left_plan.clone()),
5658                Arc::new(right_plan.clone()),
5659                vec![(col("t1.id"), col("t2.id"))],
5660                None,
5661                JoinType::Inner,
5662                JoinConstraint::Using,
5663                NullEquality::NullEqualsNothing,
5664            )?;
5665
5666            let fields = join.schema.fields();
5667
5668            assert_eq!(fields.len(), 6);
5669
5670            assert_eq!(
5671                fields[0].name(),
5672                "id",
5673                "First field should be 'id' from left table"
5674            );
5675            assert_eq!(
5676                fields[1].name(),
5677                "name",
5678                "Second field should be 'name' from left table"
5679            );
5680            assert_eq!(
5681                fields[2].name(),
5682                "value",
5683                "Third field should be 'value' from left table"
5684            );
5685            assert_eq!(
5686                fields[3].name(),
5687                "id",
5688                "Fourth field should be 'id' from right table"
5689            );
5690            assert_eq!(
5691                fields[4].name(),
5692                "category",
5693                "Fifth field should be 'category' from right table"
5694            );
5695            assert_eq!(
5696                fields[5].name(),
5697                "value",
5698                "Sixth field should be 'value' from right table"
5699            );
5700
5701            assert_eq!(join.join_constraint, JoinConstraint::Using);
5702        }
5703
5704        // Test 2: Complex join condition with expressions
5705        {
5706            // Complex condition: join on id equality AND where left.value < right.value
5707            let join = Join::try_new(
5708                Arc::new(left_plan.clone()),
5709                Arc::new(right_plan.clone()),
5710                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5711                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5712                JoinType::Inner,
5713                JoinConstraint::On,
5714                NullEquality::NullEqualsNothing,
5715            )?;
5716
5717            let fields = join.schema.fields();
5718            assert_eq!(fields.len(), 6);
5719
5720            assert_eq!(
5721                fields[0].name(),
5722                "id",
5723                "First field should be 'id' from left table"
5724            );
5725            assert_eq!(
5726                fields[1].name(),
5727                "name",
5728                "Second field should be 'name' from left table"
5729            );
5730            assert_eq!(
5731                fields[2].name(),
5732                "value",
5733                "Third field should be 'value' from left table"
5734            );
5735            assert_eq!(
5736                fields[3].name(),
5737                "id",
5738                "Fourth field should be 'id' from right table"
5739            );
5740            assert_eq!(
5741                fields[4].name(),
5742                "category",
5743                "Fifth field should be 'category' from right table"
5744            );
5745            assert_eq!(
5746                fields[5].name(),
5747                "value",
5748                "Sixth field should be 'value' from right table"
5749            );
5750
5751            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5752        }
5753
5754        // Test 3: Join with null equality behavior set to true
5755        {
5756            let join = Join::try_new(
5757                Arc::new(left_plan.clone()),
5758                Arc::new(right_plan.clone()),
5759                vec![(col("t1.id"), col("t2.id"))],
5760                None,
5761                JoinType::Inner,
5762                JoinConstraint::On,
5763                NullEquality::NullEqualsNull,
5764            )?;
5765
5766            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5767        }
5768
5769        Ok(())
5770    }
5771
5772    #[test]
5773    fn test_join_try_new_schema_validation() -> Result<()> {
5774        let left_schema = Schema::new(vec![
5775            Field::new("id", DataType::Int32, false),
5776            Field::new("name", DataType::Utf8, false),
5777            Field::new("value", DataType::Float64, true),
5778        ]);
5779
5780        let right_schema = Schema::new(vec![
5781            Field::new("id", DataType::Int32, false),
5782            Field::new("category", DataType::Utf8, true),
5783            Field::new("code", DataType::Int16, false),
5784        ]);
5785
5786        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5787
5788        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5789
5790        let join_types = vec![
5791            JoinType::Inner,
5792            JoinType::Left,
5793            JoinType::Right,
5794            JoinType::Full,
5795        ];
5796
5797        for join_type in join_types {
5798            let join = Join::try_new(
5799                Arc::new(left_plan.clone()),
5800                Arc::new(right_plan.clone()),
5801                vec![(col("t1.id"), col("t2.id"))],
5802                Some(col("t1.value").gt(lit(5.0))),
5803                join_type,
5804                JoinConstraint::On,
5805                NullEquality::NullEqualsNothing,
5806            )?;
5807
5808            let fields = join.schema.fields();
5809            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5810
5811            for (i, field) in fields.iter().enumerate() {
5812                let expected_nullable = match (i, &join_type) {
5813                    // Left table fields (indices 0, 1, 2)
5814                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5815                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5816                    (2, _) => true, // value is already nullable
5817
5818                    // Right table fields (indices 3, 4, 5)
5819                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5820                    (4, _) => true, // category is already nullable
5821                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5822
5823                    _ => false,
5824                };
5825
5826                assert_eq!(
5827                    field.is_nullable(),
5828                    expected_nullable,
5829                    "Field {} ({}) nullability incorrect for {:?} join",
5830                    i,
5831                    field.name(),
5832                    join_type
5833                );
5834            }
5835        }
5836
5837        let using_join = Join::try_new(
5838            Arc::new(left_plan.clone()),
5839            Arc::new(right_plan.clone()),
5840            vec![(col("t1.id"), col("t2.id"))],
5841            None,
5842            JoinType::Inner,
5843            JoinConstraint::Using,
5844            NullEquality::NullEqualsNothing,
5845        )?;
5846
5847        assert_eq!(
5848            using_join.schema.fields().len(),
5849            6,
5850            "USING join should have all fields"
5851        );
5852        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5853
5854        Ok(())
5855    }
5856}