Skip to main content

datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29    InvariantLevel, assert_always_invariants_at_current_node,
30    assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35    intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38    NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48    BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49    LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50    WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62    FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63    ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207    /// Evaluates an arbitrary list of expressions (essentially a
208    /// SELECT with an expression list) on its input.
209    Projection(Projection),
210    /// Filters rows from its input that do not match an
211    /// expression (essentially a WHERE clause with a predicate
212    /// expression).
213    ///
214    /// Semantically, `<predicate>` is evaluated for each row of the
215    /// input; If the value of `<predicate>` is true, the input row is
216    /// passed to the output. If the value of `<predicate>` is false
217    /// (or null), the row is discarded.
218    Filter(Filter),
219    /// Windows input based on a set of window spec and window
220    /// function (e.g. SUM or RANK).  This is used to implement SQL
221    /// window functions, and the `OVER` clause.
222    ///
223    /// See [`Window`] for more details
224    Window(Window),
225    /// Aggregates its input based on a set of grouping and aggregate
226    /// expressions (e.g. SUM). This is used to implement SQL aggregates
227    /// and `GROUP BY`.
228    ///
229    /// See [`Aggregate`] for more details
230    Aggregate(Aggregate),
231    /// Sorts its input according to a list of sort expressions. This
232    /// is used to implement SQL `ORDER BY`
233    Sort(Sort),
234    /// Join two logical plans on one or more join columns.
235    /// This is used to implement SQL `JOIN`
236    Join(Join),
237    /// Repartitions the input based on a partitioning scheme. This is
238    /// used to add parallelism and is sometimes referred to as an
239    /// "exchange" operator in other systems
240    Repartition(Repartition),
241    /// Union multiple inputs with the same schema into a single
242    /// output stream. This is used to implement SQL `UNION [ALL]` and
243    /// `INTERSECT [ALL]`.
244    Union(Union),
245    /// Produces rows from a [`TableSource`], used to implement SQL
246    /// `FROM` tables or views.
247    TableScan(TableScan),
248    /// Produces no rows: An empty relation with an empty schema that
249    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
250    /// that has no values in the `FROM` clause.
251    EmptyRelation(EmptyRelation),
252    /// Produces the output of running another query.  This is used to
253    /// implement SQL subqueries
254    Subquery(Subquery),
255    /// Aliased relation provides, or changes, the name of a relation.
256    SubqueryAlias(SubqueryAlias),
257    /// Skip some number of rows, and then fetch some number of rows.
258    Limit(Limit),
259    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
260    Statement(Statement),
261    /// Values expression. See
262    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
263    /// documentation for more details. This is used to implement SQL such as
264    /// `VALUES (1, 2), (3, 4)`
265    Values(Values),
266    /// Produces a relation with string representations of
267    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
268    Explain(Explain),
269    /// Runs the input, and prints annotated physical plan as a string
270    /// with execution metric. This is used to implement SQL
271    /// `EXPLAIN ANALYZE`.
272    Analyze(Analyze),
273    /// Extension operator defined outside of DataFusion. This is used
274    /// to extend DataFusion with custom relational operations that
275    Extension(Extension),
276    /// Remove duplicate rows from the input. This is used to
277    /// implement SQL `SELECT DISTINCT ...`.
278    Distinct(Distinct),
279    /// Data Manipulation Language (DML): Insert / Update / Delete
280    Dml(DmlStatement),
281    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
282    Ddl(DdlStatement),
283    /// `COPY TO` for writing plan results to files
284    Copy(CopyTo),
285    /// Describe the schema of the table. This is used to implement the
286    /// SQL `DESCRIBE` command from MySQL.
287    DescribeTable(DescribeTable),
288    /// Unnest a column that contains a nested list type such as an
289    /// ARRAY. This is used to implement SQL `UNNEST`
290    Unnest(Unnest),
291    /// A variadic query (e.g. "Recursive CTEs")
292    RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296    fn default() -> Self {
297        LogicalPlan::EmptyRelation(EmptyRelation {
298            produce_one_row: false,
299            schema: Arc::new(DFSchema::empty()),
300        })
301    }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306        &'a self,
307        mut f: F,
308    ) -> Result<TreeNodeRecursion> {
309        f(self)
310    }
311
312    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313        self,
314        mut f: F,
315    ) -> Result<Transformed<Self>> {
316        f(self)
317    }
318}
319
320impl LogicalPlan {
321    /// Get a reference to the logical plan's schema
322    pub fn schema(&self) -> &DFSchemaRef {
323        match self {
324            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325            LogicalPlan::Values(Values { schema, .. }) => schema,
326            LogicalPlan::TableScan(TableScan {
327                projected_schema, ..
328            }) => projected_schema,
329            LogicalPlan::Projection(Projection { schema, .. }) => schema,
330            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333            LogicalPlan::Window(Window { schema, .. }) => schema,
334            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336            LogicalPlan::Join(Join { schema, .. }) => schema,
337            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339            LogicalPlan::Statement(statement) => statement.schema(),
340            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342            LogicalPlan::Explain(explain) => &explain.schema,
343            LogicalPlan::Analyze(analyze) => &analyze.schema,
344            LogicalPlan::Extension(extension) => extension.node.schema(),
345            LogicalPlan::Union(Union { schema, .. }) => schema,
346            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347                output_schema
348            }
349            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351            LogicalPlan::Ddl(ddl) => ddl.schema(),
352            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354                // we take the schema of the static term as the schema of the entire recursive query
355                static_term.schema()
356            }
357        }
358    }
359
360    /// Used for normalizing columns, as the fallback schemas to the main schema
361    /// of the plan.
362    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363        match self {
364            LogicalPlan::Window(_)
365            | LogicalPlan::Projection(_)
366            | LogicalPlan::Aggregate(_)
367            | LogicalPlan::Unnest(_)
368            | LogicalPlan::Join(_) => self
369                .inputs()
370                .iter()
371                .map(|input| input.schema().as_ref())
372                .collect(),
373            _ => vec![],
374        }
375    }
376
377    /// Returns the (fixed) output schema for explain plans
378    pub fn explain_schema() -> SchemaRef {
379        SchemaRef::new(Schema::new(vec![
380            Field::new("plan_type", DataType::Utf8, false),
381            Field::new("plan", DataType::Utf8, false),
382        ]))
383    }
384
385    /// Returns the (fixed) output schema for `DESCRIBE` plans
386    pub fn describe_schema() -> Schema {
387        Schema::new(vec![
388            Field::new("column_name", DataType::Utf8, false),
389            Field::new("data_type", DataType::Utf8, false),
390            Field::new("is_nullable", DataType::Utf8, false),
391        ])
392    }
393
394    /// Returns all expressions (non-recursively) evaluated by the current
395    /// logical plan node. This does not include expressions in any children.
396    ///
397    /// Note this method `clone`s all the expressions. When possible, the
398    /// [`tree_node`] API should be used instead of this API.
399    ///
400    /// The returned expressions do not necessarily represent or even
401    /// contributed to the output schema of this node. For example,
402    /// `LogicalPlan::Filter` returns the filter expression even though the
403    /// output of a Filter has the same columns as the input.
404    ///
405    /// The expressions do contain all the columns that are used by this plan,
406    /// so if there are columns not referenced by these expressions then
407    /// DataFusion's optimizer attempts to optimize them away.
408    ///
409    /// [`tree_node`]: crate::logical_plan::tree_node
410    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411        let mut exprs = vec![];
412        self.apply_expressions(|e| {
413            exprs.push(e.clone());
414            Ok(TreeNodeRecursion::Continue)
415        })
416        // closure always returns OK
417        .unwrap();
418        exprs
419    }
420
421    /// Returns all the out reference(correlated) expressions (recursively) in the current
422    /// logical plan nodes and all its descendant nodes.
423    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424        let mut exprs = vec![];
425        self.apply_expressions(|e| {
426            find_out_reference_exprs(e).into_iter().for_each(|e| {
427                if !exprs.contains(&e) {
428                    exprs.push(e)
429                }
430            });
431            Ok(TreeNodeRecursion::Continue)
432        })
433        // closure always returns OK
434        .unwrap();
435        self.inputs()
436            .into_iter()
437            .flat_map(|child| child.all_out_ref_exprs())
438            .for_each(|e| {
439                if !exprs.contains(&e) {
440                    exprs.push(e)
441                }
442            });
443        exprs
444    }
445
446    /// Returns all inputs / children of this `LogicalPlan` node.
447    ///
448    /// Note does not include inputs to inputs, or subqueries.
449    pub fn inputs(&self) -> Vec<&LogicalPlan> {
450        match self {
451            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454            LogicalPlan::Window(Window { input, .. }) => vec![input],
455            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461            LogicalPlan::Extension(extension) => extension.node.inputs(),
462            LogicalPlan::Union(Union { inputs, .. }) => {
463                inputs.iter().map(|arc| arc.as_ref()).collect()
464            }
465            LogicalPlan::Distinct(
466                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467            ) => vec![input],
468            LogicalPlan::Explain(explain) => vec![&explain.plan],
469            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470            LogicalPlan::Dml(write) => vec![&write.input],
471            LogicalPlan::Copy(copy) => vec![&copy.input],
472            LogicalPlan::Ddl(ddl) => ddl.inputs(),
473            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474            LogicalPlan::RecursiveQuery(RecursiveQuery {
475                static_term,
476                recursive_term,
477                ..
478            }) => vec![static_term, recursive_term],
479            LogicalPlan::Statement(stmt) => stmt.inputs(),
480            // plans without inputs
481            LogicalPlan::TableScan { .. }
482            | LogicalPlan::EmptyRelation { .. }
483            | LogicalPlan::Values { .. }
484            | LogicalPlan::DescribeTable(_) => vec![],
485        }
486    }
487
488    /// returns all `Using` join columns in a logical plan
489    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490        let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492        self.apply_with_subqueries(|plan| {
493            if let LogicalPlan::Join(Join {
494                join_constraint: JoinConstraint::Using,
495                on,
496                ..
497            }) = plan
498            {
499                // The join keys in using-join must be columns.
500                let columns =
501                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502                        let Some(l) = l.get_as_join_column() else {
503                            return internal_err!(
504                                "Invalid join key. Expected column, found {l:?}"
505                            );
506                        };
507                        let Some(r) = r.get_as_join_column() else {
508                            return internal_err!(
509                                "Invalid join key. Expected column, found {r:?}"
510                            );
511                        };
512                        accumu.insert(l.to_owned());
513                        accumu.insert(r.to_owned());
514                        Result::<_, DataFusionError>::Ok(accumu)
515                    })?;
516                using_columns.push(columns);
517            }
518            Ok(TreeNodeRecursion::Continue)
519        })?;
520
521        Ok(using_columns)
522    }
523
524    /// returns the first output expression of this `LogicalPlan` node.
525    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526        match self {
527            LogicalPlan::Projection(projection) => {
528                Ok(Some(projection.expr.as_slice()[0].clone()))
529            }
530            LogicalPlan::Aggregate(agg) => {
531                if agg.group_expr.is_empty() {
532                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533                } else {
534                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
535                }
536            }
537            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538                Ok(Some(select_expr[0].clone()))
539            }
540            LogicalPlan::Filter(Filter { input, .. })
541            | LogicalPlan::Distinct(Distinct::All(input))
542            | LogicalPlan::Sort(Sort { input, .. })
543            | LogicalPlan::Limit(Limit { input, .. })
544            | LogicalPlan::Repartition(Repartition { input, .. })
545            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546            LogicalPlan::Join(Join {
547                left,
548                right,
549                join_type,
550                ..
551            }) => match join_type {
552                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553                    if left.schema().fields().is_empty() {
554                        right.head_output_expr()
555                    } else {
556                        left.head_output_expr()
557                    }
558                }
559                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560                    left.head_output_expr()
561                }
562                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563                    right.head_output_expr()
564                }
565            },
566            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567                static_term.head_output_expr()
568            }
569            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570                union.schema.qualified_field(0),
571            )))),
572            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573                table.projected_schema.qualified_field(0),
574            )))),
575            LogicalPlan::SubqueryAlias(subquery_alias) => {
576                let expr_opt = subquery_alias.input.head_output_expr()?;
577                expr_opt
578                    .map(|expr| {
579                        Ok(Expr::Column(create_col_from_scalar_expr(
580                            &expr,
581                            subquery_alias.alias.to_string(),
582                        )?))
583                    })
584                    .map_or(Ok(None), |v| v.map(Some))
585            }
586            LogicalPlan::Subquery(_) => Ok(None),
587            LogicalPlan::EmptyRelation(_)
588            | LogicalPlan::Statement(_)
589            | LogicalPlan::Values(_)
590            | LogicalPlan::Explain(_)
591            | LogicalPlan::Analyze(_)
592            | LogicalPlan::Extension(_)
593            | LogicalPlan::Dml(_)
594            | LogicalPlan::Copy(_)
595            | LogicalPlan::Ddl(_)
596            | LogicalPlan::DescribeTable(_)
597            | LogicalPlan::Unnest(_) => Ok(None),
598        }
599    }
600
601    /// Recomputes schema and type information for this LogicalPlan if needed.
602    ///
603    /// Some `LogicalPlan`s may need to recompute their schema if the number or
604    /// type of expressions have been changed (for example due to type
605    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
606    /// its expressions.
607    ///
608    /// Some `LogicalPlan`s schema is unaffected by any changes to their
609    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
610    /// same as its input schema.
611    ///
612    /// This is useful after modifying a plans `Expr`s (or input plans) via
613    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
614    /// [Self::with_new_exprs], this method does not require a new set of
615    /// expressions or inputs plans.
616    ///
617    /// # Return value
618    /// Returns an error if there is some issue recomputing the schema.
619    ///
620    /// # Notes
621    ///
622    /// * Does not recursively recompute schema for input (child) plans.
623    pub fn recompute_schema(self) -> Result<Self> {
624        match self {
625            // Since expr may be different than the previous expr, schema of the projection
626            // may change. We need to use try_new method instead of try_new_with_schema method.
627            LogicalPlan::Projection(Projection {
628                expr,
629                input,
630                schema: _,
631            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632            LogicalPlan::Dml(_) => Ok(self),
633            LogicalPlan::Copy(_) => Ok(self),
634            LogicalPlan::Values(Values { schema, values }) => {
635                // todo it isn't clear why the schema is not recomputed here
636                Ok(LogicalPlan::Values(Values { schema, values }))
637            }
638            LogicalPlan::Filter(Filter { predicate, input }) => {
639                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640            }
641            LogicalPlan::Repartition(_) => Ok(self),
642            LogicalPlan::Window(Window {
643                input,
644                window_expr,
645                schema: _,
646            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647            LogicalPlan::Aggregate(Aggregate {
648                input,
649                group_expr,
650                aggr_expr,
651                schema: _,
652            }) => Aggregate::try_new(input, group_expr, aggr_expr)
653                .map(LogicalPlan::Aggregate),
654            LogicalPlan::Sort(_) => Ok(self),
655            LogicalPlan::Join(Join {
656                left,
657                right,
658                filter,
659                join_type,
660                join_constraint,
661                on,
662                schema: _,
663                null_equality,
664                null_aware,
665            }) => {
666                let schema =
667                    build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669                let new_on: Vec<_> = on
670                    .into_iter()
671                    .map(|equi_expr| {
672                        // SimplifyExpression rule may add alias to the equi_expr.
673                        (equi_expr.0.unalias(), equi_expr.1.unalias())
674                    })
675                    .collect();
676
677                Ok(LogicalPlan::Join(Join {
678                    left,
679                    right,
680                    join_type,
681                    join_constraint,
682                    on: new_on,
683                    filter,
684                    schema: DFSchemaRef::new(schema),
685                    null_equality,
686                    null_aware,
687                }))
688            }
689            LogicalPlan::Subquery(_) => Ok(self),
690            LogicalPlan::SubqueryAlias(SubqueryAlias {
691                input,
692                alias,
693                schema: _,
694            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
695            LogicalPlan::Limit(_) => Ok(self),
696            LogicalPlan::Ddl(_) => Ok(self),
697            LogicalPlan::Extension(Extension { node }) => {
698                // todo make an API that does not require cloning
699                // This requires a copy of the extension nodes expressions and inputs
700                let expr = node.expressions();
701                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
702                Ok(LogicalPlan::Extension(Extension {
703                    node: node.with_exprs_and_inputs(expr, inputs)?,
704                }))
705            }
706            LogicalPlan::Union(Union { inputs, schema }) => {
707                let first_input_schema = inputs[0].schema();
708                if schema.fields().len() == first_input_schema.fields().len() {
709                    // If inputs are not pruned do not change schema
710                    Ok(LogicalPlan::Union(Union { inputs, schema }))
711                } else {
712                    // A note on `Union`s constructed via `try_new_by_name`:
713                    //
714                    // At this point, the schema for each input should have
715                    // the same width. Thus, we do not need to save whether a
716                    // `Union` was created `BY NAME`, and can safely rely on the
717                    // `try_new` initializer to derive the new schema based on
718                    // column positions.
719                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
720                }
721            }
722            LogicalPlan::Distinct(distinct) => {
723                let distinct = match distinct {
724                    Distinct::All(input) => Distinct::All(input),
725                    Distinct::On(DistinctOn {
726                        on_expr,
727                        select_expr,
728                        sort_expr,
729                        input,
730                        schema: _,
731                    }) => Distinct::On(DistinctOn::try_new(
732                        on_expr,
733                        select_expr,
734                        sort_expr,
735                        input,
736                    )?),
737                };
738                Ok(LogicalPlan::Distinct(distinct))
739            }
740            LogicalPlan::RecursiveQuery(_) => Ok(self),
741            LogicalPlan::Analyze(_) => Ok(self),
742            LogicalPlan::Explain(_) => Ok(self),
743            LogicalPlan::TableScan(_) => Ok(self),
744            LogicalPlan::EmptyRelation(_) => Ok(self),
745            LogicalPlan::Statement(_) => Ok(self),
746            LogicalPlan::DescribeTable(_) => Ok(self),
747            LogicalPlan::Unnest(Unnest {
748                input,
749                exec_columns,
750                options,
751                ..
752            }) => {
753                // Update schema with unnested column type.
754                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
755            }
756        }
757    }
758
759    /// Returns a new `LogicalPlan` based on `self` with inputs and
760    /// expressions replaced.
761    ///
762    /// Note this method creates an entirely new node, which requires a large
763    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
764    /// instead of this API.
765    ///
766    /// The exprs correspond to the same order of expressions returned
767    /// by [`Self::expressions`]. This function is used by optimizers
768    /// to rewrite plans using the following pattern:
769    ///
770    /// [`tree_node`]: crate::logical_plan::tree_node
771    ///
772    /// ```text
773    /// let new_inputs = optimize_children(..., plan, props);
774    ///
775    /// // get the plans expressions to optimize
776    /// let exprs = plan.expressions();
777    ///
778    /// // potentially rewrite plan expressions
779    /// let rewritten_exprs = rewrite_exprs(exprs);
780    ///
781    /// // create new plan using rewritten_exprs in same position
782    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
783    /// ```
784    pub fn with_new_exprs(
785        &self,
786        mut expr: Vec<Expr>,
787        inputs: Vec<LogicalPlan>,
788    ) -> Result<LogicalPlan> {
789        match self {
790            // Since expr may be different than the previous expr, schema of the projection
791            // may change. We need to use try_new method instead of try_new_with_schema method.
792            LogicalPlan::Projection(Projection { .. }) => {
793                let input = self.only_input(inputs)?;
794                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
795            }
796            LogicalPlan::Dml(DmlStatement {
797                table_name,
798                target,
799                op,
800                ..
801            }) => {
802                self.assert_no_expressions(expr)?;
803                let input = self.only_input(inputs)?;
804                Ok(LogicalPlan::Dml(DmlStatement::new(
805                    table_name.clone(),
806                    Arc::clone(target),
807                    op.clone(),
808                    Arc::new(input),
809                )))
810            }
811            LogicalPlan::Copy(CopyTo {
812                input: _,
813                output_url,
814                file_type,
815                options,
816                partition_by,
817                output_schema: _,
818            }) => {
819                self.assert_no_expressions(expr)?;
820                let input = self.only_input(inputs)?;
821                Ok(LogicalPlan::Copy(CopyTo::new(
822                    Arc::new(input),
823                    output_url.clone(),
824                    partition_by.clone(),
825                    Arc::clone(file_type),
826                    options.clone(),
827                )))
828            }
829            LogicalPlan::Values(Values { schema, .. }) => {
830                self.assert_no_inputs(inputs)?;
831                Ok(LogicalPlan::Values(Values {
832                    schema: Arc::clone(schema),
833                    values: expr
834                        .chunks_exact(schema.fields().len())
835                        .map(|s| s.to_vec())
836                        .collect(),
837                }))
838            }
839            LogicalPlan::Filter { .. } => {
840                let predicate = self.only_expr(expr)?;
841                let input = self.only_input(inputs)?;
842
843                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
844            }
845            LogicalPlan::Repartition(Repartition {
846                partitioning_scheme,
847                ..
848            }) => match partitioning_scheme {
849                Partitioning::RoundRobinBatch(n) => {
850                    self.assert_no_expressions(expr)?;
851                    let input = self.only_input(inputs)?;
852                    Ok(LogicalPlan::Repartition(Repartition {
853                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
854                        input: Arc::new(input),
855                    }))
856                }
857                Partitioning::Hash(_, n) => {
858                    let input = self.only_input(inputs)?;
859                    Ok(LogicalPlan::Repartition(Repartition {
860                        partitioning_scheme: Partitioning::Hash(expr, *n),
861                        input: Arc::new(input),
862                    }))
863                }
864                Partitioning::DistributeBy(_) => {
865                    let input = self.only_input(inputs)?;
866                    Ok(LogicalPlan::Repartition(Repartition {
867                        partitioning_scheme: Partitioning::DistributeBy(expr),
868                        input: Arc::new(input),
869                    }))
870                }
871            },
872            LogicalPlan::Window(Window { window_expr, .. }) => {
873                assert_eq!(window_expr.len(), expr.len());
874                let input = self.only_input(inputs)?;
875                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
876            }
877            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
878                let input = self.only_input(inputs)?;
879                // group exprs are the first expressions
880                let agg_expr = expr.split_off(group_expr.len());
881
882                Aggregate::try_new(Arc::new(input), expr, agg_expr)
883                    .map(LogicalPlan::Aggregate)
884            }
885            LogicalPlan::Sort(Sort {
886                expr: sort_expr,
887                fetch,
888                ..
889            }) => {
890                let input = self.only_input(inputs)?;
891                Ok(LogicalPlan::Sort(Sort {
892                    expr: expr
893                        .into_iter()
894                        .zip(sort_expr.iter())
895                        .map(|(expr, sort)| sort.with_expr(expr))
896                        .collect(),
897                    input: Arc::new(input),
898                    fetch: *fetch,
899                }))
900            }
901            LogicalPlan::Join(Join {
902                join_type,
903                join_constraint,
904                on,
905                null_equality,
906                null_aware,
907                ..
908            }) => {
909                let (left, right) = self.only_two_inputs(inputs)?;
910                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
911
912                let equi_expr_count = on.len() * 2;
913                assert!(expr.len() >= equi_expr_count);
914
915                // Assume that the last expr, if any,
916                // is the filter_expr (non equality predicate from ON clause)
917                let filter_expr = if expr.len() > equi_expr_count {
918                    expr.pop()
919                } else {
920                    None
921                };
922
923                // The first part of expr is equi-exprs,
924                // and the struct of each equi-expr is like `left-expr = right-expr`.
925                assert_eq!(expr.len(), equi_expr_count);
926                let mut new_on = Vec::with_capacity(on.len());
927                let mut iter = expr.into_iter();
928                while let Some(left) = iter.next() {
929                    let Some(right) = iter.next() else {
930                        internal_err!(
931                            "Expected a pair of expressions to construct the join on expression"
932                        )?
933                    };
934
935                    // SimplifyExpression rule may add alias to the equi_expr.
936                    new_on.push((left.unalias(), right.unalias()));
937                }
938
939                Ok(LogicalPlan::Join(Join {
940                    left: Arc::new(left),
941                    right: Arc::new(right),
942                    join_type: *join_type,
943                    join_constraint: *join_constraint,
944                    on: new_on,
945                    filter: filter_expr,
946                    schema: DFSchemaRef::new(schema),
947                    null_equality: *null_equality,
948                    null_aware: *null_aware,
949                }))
950            }
951            LogicalPlan::Subquery(Subquery {
952                outer_ref_columns,
953                spans,
954                ..
955            }) => {
956                self.assert_no_expressions(expr)?;
957                let input = self.only_input(inputs)?;
958                let subquery = LogicalPlanBuilder::from(input).build()?;
959                Ok(LogicalPlan::Subquery(Subquery {
960                    subquery: Arc::new(subquery),
961                    outer_ref_columns: outer_ref_columns.clone(),
962                    spans: spans.clone(),
963                }))
964            }
965            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
966                self.assert_no_expressions(expr)?;
967                let input = self.only_input(inputs)?;
968                SubqueryAlias::try_new(Arc::new(input), alias.clone())
969                    .map(LogicalPlan::SubqueryAlias)
970            }
971            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
972                let old_expr_len = skip.iter().chain(fetch.iter()).count();
973                assert_eq_or_internal_err!(
974                    old_expr_len,
975                    expr.len(),
976                    "Invalid number of new Limit expressions: expected {}, got {}",
977                    old_expr_len,
978                    expr.len()
979                );
980                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
981                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
982                let new_skip = skip.as_ref().and_then(|_| expr.pop());
983                let input = self.only_input(inputs)?;
984                Ok(LogicalPlan::Limit(Limit {
985                    skip: new_skip.map(Box::new),
986                    fetch: new_fetch.map(Box::new),
987                    input: Arc::new(input),
988                }))
989            }
990            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
991                name,
992                if_not_exists,
993                or_replace,
994                column_defaults,
995                temporary,
996                ..
997            })) => {
998                self.assert_no_expressions(expr)?;
999                let input = self.only_input(inputs)?;
1000                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1001                    CreateMemoryTable {
1002                        input: Arc::new(input),
1003                        constraints: Constraints::default(),
1004                        name: name.clone(),
1005                        if_not_exists: *if_not_exists,
1006                        or_replace: *or_replace,
1007                        column_defaults: column_defaults.clone(),
1008                        temporary: *temporary,
1009                    },
1010                )))
1011            }
1012            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1013                name,
1014                or_replace,
1015                definition,
1016                temporary,
1017                ..
1018            })) => {
1019                self.assert_no_expressions(expr)?;
1020                let input = self.only_input(inputs)?;
1021                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1022                    input: Arc::new(input),
1023                    name: name.clone(),
1024                    or_replace: *or_replace,
1025                    temporary: *temporary,
1026                    definition: definition.clone(),
1027                })))
1028            }
1029            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1030                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1031            })),
1032            LogicalPlan::Union(Union { schema, .. }) => {
1033                self.assert_no_expressions(expr)?;
1034                let input_schema = inputs[0].schema();
1035                // If inputs are not pruned do not change schema.
1036                let schema = if schema.fields().len() == input_schema.fields().len() {
1037                    Arc::clone(schema)
1038                } else {
1039                    Arc::clone(input_schema)
1040                };
1041                Ok(LogicalPlan::Union(Union {
1042                    inputs: inputs.into_iter().map(Arc::new).collect(),
1043                    schema,
1044                }))
1045            }
1046            LogicalPlan::Distinct(distinct) => {
1047                let distinct = match distinct {
1048                    Distinct::All(_) => {
1049                        self.assert_no_expressions(expr)?;
1050                        let input = self.only_input(inputs)?;
1051                        Distinct::All(Arc::new(input))
1052                    }
1053                    Distinct::On(DistinctOn {
1054                        on_expr,
1055                        select_expr,
1056                        ..
1057                    }) => {
1058                        let input = self.only_input(inputs)?;
1059                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1060                        let select_expr = expr.split_off(on_expr.len());
1061                        assert!(
1062                            sort_expr.is_empty(),
1063                            "with_new_exprs for Distinct does not support sort expressions"
1064                        );
1065                        Distinct::On(DistinctOn::try_new(
1066                            expr,
1067                            select_expr,
1068                            None, // no sort expressions accepted
1069                            Arc::new(input),
1070                        )?)
1071                    }
1072                };
1073                Ok(LogicalPlan::Distinct(distinct))
1074            }
1075            LogicalPlan::RecursiveQuery(RecursiveQuery {
1076                name, is_distinct, ..
1077            }) => {
1078                self.assert_no_expressions(expr)?;
1079                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1080                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1081                    name: name.clone(),
1082                    static_term: Arc::new(static_term),
1083                    recursive_term: Arc::new(recursive_term),
1084                    is_distinct: *is_distinct,
1085                }))
1086            }
1087            LogicalPlan::Analyze(a) => {
1088                self.assert_no_expressions(expr)?;
1089                let input = self.only_input(inputs)?;
1090                Ok(LogicalPlan::Analyze(Analyze {
1091                    verbose: a.verbose,
1092                    schema: Arc::clone(&a.schema),
1093                    input: Arc::new(input),
1094                }))
1095            }
1096            LogicalPlan::Explain(e) => {
1097                self.assert_no_expressions(expr)?;
1098                let input = self.only_input(inputs)?;
1099                Ok(LogicalPlan::Explain(Explain {
1100                    verbose: e.verbose,
1101                    plan: Arc::new(input),
1102                    explain_format: e.explain_format.clone(),
1103                    stringified_plans: e.stringified_plans.clone(),
1104                    schema: Arc::clone(&e.schema),
1105                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1106                }))
1107            }
1108            LogicalPlan::Statement(Statement::Prepare(Prepare {
1109                name, fields, ..
1110            })) => {
1111                self.assert_no_expressions(expr)?;
1112                let input = self.only_input(inputs)?;
1113                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1114                    name: name.clone(),
1115                    fields: fields.clone(),
1116                    input: Arc::new(input),
1117                })))
1118            }
1119            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1120                self.assert_no_inputs(inputs)?;
1121                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1122                    name: name.clone(),
1123                    parameters: expr,
1124                })))
1125            }
1126            LogicalPlan::TableScan(ts) => {
1127                self.assert_no_inputs(inputs)?;
1128                Ok(LogicalPlan::TableScan(TableScan {
1129                    filters: expr,
1130                    ..ts.clone()
1131                }))
1132            }
1133            LogicalPlan::EmptyRelation(_)
1134            | LogicalPlan::Ddl(_)
1135            | LogicalPlan::Statement(_)
1136            | LogicalPlan::DescribeTable(_) => {
1137                // All of these plan types have no inputs / exprs so should not be called
1138                self.assert_no_expressions(expr)?;
1139                self.assert_no_inputs(inputs)?;
1140                Ok(self.clone())
1141            }
1142            LogicalPlan::Unnest(Unnest {
1143                exec_columns: columns,
1144                options,
1145                ..
1146            }) => {
1147                self.assert_no_expressions(expr)?;
1148                let input = self.only_input(inputs)?;
1149                // Update schema with unnested column type.
1150                let new_plan =
1151                    unnest_with_options(input, columns.clone(), options.clone())?;
1152                Ok(new_plan)
1153            }
1154        }
1155    }
1156
1157    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1158    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1159        match check {
1160            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1161            InvariantLevel::Executable => assert_executable_invariants(self),
1162        }
1163    }
1164
1165    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1166    #[inline]
1167    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1168    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1169        assert_or_internal_err!(
1170            expr.is_empty(),
1171            "{self:?} should have no exprs, got {:?}",
1172            expr
1173        );
1174        Ok(())
1175    }
1176
1177    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1178    #[inline]
1179    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1180    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1181        assert_or_internal_err!(
1182            inputs.is_empty(),
1183            "{self:?} should have no inputs, got: {:?}",
1184            inputs
1185        );
1186        Ok(())
1187    }
1188
1189    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1190    #[inline]
1191    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1192        assert_eq_or_internal_err!(
1193            expr.len(),
1194            1,
1195            "{self:?} should have exactly one expr, got {:?}",
1196            &expr
1197        );
1198        Ok(expr.remove(0))
1199    }
1200
1201    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1202    #[inline]
1203    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1204        assert_eq_or_internal_err!(
1205            inputs.len(),
1206            1,
1207            "{self:?} should have exactly one input, got {:?}",
1208            &inputs
1209        );
1210        Ok(inputs.remove(0))
1211    }
1212
1213    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1214    #[inline]
1215    fn only_two_inputs(
1216        &self,
1217        mut inputs: Vec<LogicalPlan>,
1218    ) -> Result<(LogicalPlan, LogicalPlan)> {
1219        assert_eq_or_internal_err!(
1220            inputs.len(),
1221            2,
1222            "{self:?} should have exactly two inputs, got {:?}",
1223            &inputs
1224        );
1225        let right = inputs.remove(1);
1226        let left = inputs.remove(0);
1227        Ok((left, right))
1228    }
1229
1230    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1231    /// with the specified `param_values`.
1232    ///
1233    /// [`Prepare`] statements are converted to
1234    /// their inner logical plan for execution.
1235    ///
1236    /// # Example
1237    /// ```
1238    /// # use arrow::datatypes::{Field, Schema, DataType};
1239    /// use datafusion_common::ScalarValue;
1240    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1241    /// # let schema = Schema::new(vec![
1242    /// #     Field::new("id", DataType::Int32, false),
1243    /// # ]);
1244    /// // Build SELECT * FROM t1 WHERE id = $1
1245    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1246    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1247    ///     .build().unwrap();
1248    ///
1249    /// assert_eq!(
1250    ///   "Filter: t1.id = $1\
1251    ///   \n  TableScan: t1",
1252    ///   plan.display_indent().to_string()
1253    /// );
1254    ///
1255    /// // Fill in the parameter $1 with a literal 3
1256    /// let plan = plan.with_param_values(vec![
1257    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1258    /// ]).unwrap();
1259    ///
1260    /// assert_eq!(
1261    ///    "Filter: t1.id = Int32(3)\
1262    ///    \n  TableScan: t1",
1263    ///    plan.display_indent().to_string()
1264    ///  );
1265    ///
1266    /// // Note you can also used named parameters
1267    /// // Build SELECT * FROM t1 WHERE id = $my_param
1268    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1269    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1270    ///     .build().unwrap()
1271    ///     // Fill in the parameter $my_param with a literal 3
1272    ///     .with_param_values(vec![
1273    ///       ("my_param", ScalarValue::from(3i32)),
1274    ///     ]).unwrap();
1275    ///
1276    /// assert_eq!(
1277    ///    "Filter: t1.id = Int32(3)\
1278    ///    \n  TableScan: t1",
1279    ///    plan.display_indent().to_string()
1280    ///  );
1281    /// ```
1282    pub fn with_param_values(
1283        self,
1284        param_values: impl Into<ParamValues>,
1285    ) -> Result<LogicalPlan> {
1286        let param_values = param_values.into();
1287        let plan_with_values = self.replace_params_with_values(&param_values)?;
1288
1289        // unwrap Prepare
1290        Ok(
1291            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1292                plan_with_values
1293            {
1294                param_values.verify_fields(&prepare_lp.fields)?;
1295                // try and take ownership of the input if is not shared, clone otherwise
1296                Arc::unwrap_or_clone(prepare_lp.input)
1297            } else {
1298                plan_with_values
1299            },
1300        )
1301    }
1302
1303    /// Returns the maximum number of rows that this plan can output, if known.
1304    ///
1305    /// If `None`, the plan can return any number of rows.
1306    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1307    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1308        match self {
1309            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1310            LogicalPlan::Filter(filter) => {
1311                if filter.is_scalar() {
1312                    Some(1)
1313                } else {
1314                    filter.input.max_rows()
1315                }
1316            }
1317            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1318            LogicalPlan::Aggregate(Aggregate {
1319                input, group_expr, ..
1320            }) => {
1321                // Empty group_expr will return Some(1)
1322                if group_expr
1323                    .iter()
1324                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1325                {
1326                    Some(1)
1327                } else {
1328                    input.max_rows()
1329                }
1330            }
1331            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1332                match (fetch, input.max_rows()) {
1333                    (Some(fetch_limit), Some(input_max)) => {
1334                        Some(input_max.min(*fetch_limit))
1335                    }
1336                    (Some(fetch_limit), None) => Some(*fetch_limit),
1337                    (None, Some(input_max)) => Some(input_max),
1338                    (None, None) => None,
1339                }
1340            }
1341            LogicalPlan::Join(Join {
1342                left,
1343                right,
1344                join_type,
1345                ..
1346            }) => match join_type {
1347                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1348                JoinType::Left | JoinType::Right | JoinType::Full => {
1349                    match (left.max_rows()?, right.max_rows()?, join_type) {
1350                        (0, 0, _) => Some(0),
1351                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1352                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1353                        (left_max, right_max, _) => Some(left_max * right_max),
1354                    }
1355                }
1356                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1357                    left.max_rows()
1358                }
1359                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1360                    right.max_rows()
1361                }
1362            },
1363            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1364            LogicalPlan::Union(Union { inputs, .. }) => {
1365                inputs.iter().try_fold(0usize, |mut acc, plan| {
1366                    acc += plan.max_rows()?;
1367                    Some(acc)
1368                })
1369            }
1370            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1371            LogicalPlan::EmptyRelation(_) => Some(0),
1372            LogicalPlan::RecursiveQuery(_) => None,
1373            LogicalPlan::Subquery(_) => None,
1374            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1375            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1376                Ok(FetchType::Literal(s)) => s,
1377                _ => None,
1378            },
1379            LogicalPlan::Distinct(
1380                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1381            ) => input.max_rows(),
1382            LogicalPlan::Values(v) => Some(v.values.len()),
1383            LogicalPlan::Unnest(_) => None,
1384            LogicalPlan::Ddl(_)
1385            | LogicalPlan::Explain(_)
1386            | LogicalPlan::Analyze(_)
1387            | LogicalPlan::Dml(_)
1388            | LogicalPlan::Copy(_)
1389            | LogicalPlan::DescribeTable(_)
1390            | LogicalPlan::Statement(_)
1391            | LogicalPlan::Extension(_) => None,
1392        }
1393    }
1394
1395    /// Returns the skip (offset) of this plan node, if it has one.
1396    ///
1397    /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1398    /// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1399    pub fn skip(&self) -> Result<Option<usize>> {
1400        match self {
1401            LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1402                SkipType::Literal(0) => Ok(None),
1403                SkipType::Literal(n) => Ok(Some(n)),
1404                SkipType::UnsupportedExpr => Ok(None),
1405            },
1406            LogicalPlan::Sort(_) => Ok(None),
1407            LogicalPlan::TableScan(_) => Ok(None),
1408            LogicalPlan::Projection(_) => Ok(None),
1409            LogicalPlan::Filter(_) => Ok(None),
1410            LogicalPlan::Window(_) => Ok(None),
1411            LogicalPlan::Aggregate(_) => Ok(None),
1412            LogicalPlan::Join(_) => Ok(None),
1413            LogicalPlan::Repartition(_) => Ok(None),
1414            LogicalPlan::Union(_) => Ok(None),
1415            LogicalPlan::EmptyRelation(_) => Ok(None),
1416            LogicalPlan::Subquery(_) => Ok(None),
1417            LogicalPlan::SubqueryAlias(_) => Ok(None),
1418            LogicalPlan::Statement(_) => Ok(None),
1419            LogicalPlan::Values(_) => Ok(None),
1420            LogicalPlan::Explain(_) => Ok(None),
1421            LogicalPlan::Analyze(_) => Ok(None),
1422            LogicalPlan::Extension(_) => Ok(None),
1423            LogicalPlan::Distinct(_) => Ok(None),
1424            LogicalPlan::Dml(_) => Ok(None),
1425            LogicalPlan::Ddl(_) => Ok(None),
1426            LogicalPlan::Copy(_) => Ok(None),
1427            LogicalPlan::DescribeTable(_) => Ok(None),
1428            LogicalPlan::Unnest(_) => Ok(None),
1429            LogicalPlan::RecursiveQuery(_) => Ok(None),
1430        }
1431    }
1432
1433    /// Returns the fetch (limit) of this plan node, if it has one.
1434    ///
1435    /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1436    /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1437    /// return `Ok(None)`.
1438    pub fn fetch(&self) -> Result<Option<usize>> {
1439        match self {
1440            LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1441            LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1442            LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1443                FetchType::Literal(s) => Ok(s),
1444                FetchType::UnsupportedExpr => Ok(None),
1445            },
1446            LogicalPlan::Projection(_) => Ok(None),
1447            LogicalPlan::Filter(_) => Ok(None),
1448            LogicalPlan::Window(_) => Ok(None),
1449            LogicalPlan::Aggregate(_) => Ok(None),
1450            LogicalPlan::Join(_) => Ok(None),
1451            LogicalPlan::Repartition(_) => Ok(None),
1452            LogicalPlan::Union(_) => Ok(None),
1453            LogicalPlan::EmptyRelation(_) => Ok(None),
1454            LogicalPlan::Subquery(_) => Ok(None),
1455            LogicalPlan::SubqueryAlias(_) => Ok(None),
1456            LogicalPlan::Statement(_) => Ok(None),
1457            LogicalPlan::Values(_) => Ok(None),
1458            LogicalPlan::Explain(_) => Ok(None),
1459            LogicalPlan::Analyze(_) => Ok(None),
1460            LogicalPlan::Extension(_) => Ok(None),
1461            LogicalPlan::Distinct(_) => Ok(None),
1462            LogicalPlan::Dml(_) => Ok(None),
1463            LogicalPlan::Ddl(_) => Ok(None),
1464            LogicalPlan::Copy(_) => Ok(None),
1465            LogicalPlan::DescribeTable(_) => Ok(None),
1466            LogicalPlan::Unnest(_) => Ok(None),
1467            LogicalPlan::RecursiveQuery(_) => Ok(None),
1468        }
1469    }
1470
1471    /// If this node's expressions contains any references to an outer subquery
1472    pub fn contains_outer_reference(&self) -> bool {
1473        let mut contains = false;
1474        self.apply_expressions(|expr| {
1475            Ok(if expr.contains_outer() {
1476                contains = true;
1477                TreeNodeRecursion::Stop
1478            } else {
1479                TreeNodeRecursion::Continue
1480            })
1481        })
1482        .unwrap();
1483        contains
1484    }
1485
1486    /// Get the output expressions and their corresponding columns.
1487    ///
1488    /// The parent node may reference the output columns of the plan by expressions, such as
1489    /// projection over aggregate or window functions. This method helps to convert the
1490    /// referenced expressions into columns.
1491    ///
1492    /// See also: [`crate::utils::columnize_expr`]
1493    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1494        match self {
1495            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1496                .output_expressions()?
1497                .into_iter()
1498                .zip(self.schema().columns())
1499                .collect()),
1500            LogicalPlan::Window(Window {
1501                window_expr,
1502                input,
1503                schema,
1504            }) => {
1505                // The input could be another Window, so the result should also include the input's. For Example:
1506                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1507                // Its plan is:
1508                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1509                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1510                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1511                //       TableScan: t projection=[a, b]
1512                let mut output_exprs = input.columnized_output_exprs()?;
1513                let input_len = input.schema().fields().len();
1514                output_exprs.extend(
1515                    window_expr
1516                        .iter()
1517                        .zip(schema.columns().into_iter().skip(input_len)),
1518                );
1519                Ok(output_exprs)
1520            }
1521            _ => Ok(vec![]),
1522        }
1523    }
1524}
1525
1526impl LogicalPlan {
1527    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1528    /// ...) replaced with corresponding values provided in
1529    /// `params_values`
1530    ///
1531    /// See [`Self::with_param_values`] for examples and usage with an owned
1532    /// `ParamValues`
1533    pub fn replace_params_with_values(
1534        self,
1535        param_values: &ParamValues,
1536    ) -> Result<LogicalPlan> {
1537        self.transform_up_with_subqueries(|plan| {
1538            let schema = Arc::clone(plan.schema());
1539            let name_preserver = NamePreserver::new(&plan);
1540            plan.map_expressions(|e| {
1541                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1542                if !has_placeholder {
1543                    // Performance optimization:
1544                    // avoid NamePreserver copy and second pass over expression
1545                    // if no placeholders.
1546                    Ok(Transformed::no(e))
1547                } else {
1548                    let original_name = name_preserver.save(&e);
1549                    let transformed_expr = e.transform_up(|e| {
1550                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1551                            let (value, metadata) = param_values
1552                                .get_placeholders_with_values(&id)?
1553                                .into_inner();
1554                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1555                        } else {
1556                            Ok(Transformed::no(e))
1557                        }
1558                    })?;
1559                    // Preserve name to avoid breaking column references to this expression
1560                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1561                }
1562            })?
1563            .map_data(|plan| plan.update_schema_data_type())
1564        })
1565        .map(|res| res.data)
1566    }
1567
1568    /// Recompute schema fields' data type after replacing params, ensuring fields data type can be
1569    /// updated according to the new parameters.
1570    ///
1571    /// Unlike `recompute_schema()`, this method rebuilds VALUES plans entirely to properly infer
1572    /// types types from literal values after placeholder substitution.
1573    fn update_schema_data_type(self) -> Result<LogicalPlan> {
1574        match self {
1575            // Build `LogicalPlan::Values` from the values for type inference.
1576            // We can't use `recompute_schema` because it skips recomputing for
1577            // `LogicalPlan::Values`.
1578            LogicalPlan::Values(Values { values, schema: _ }) => {
1579                LogicalPlanBuilder::values(values)?.build()
1580            }
1581            // other plans can just use `recompute_schema` directly.
1582            plan => plan.recompute_schema(),
1583        }
1584    }
1585
1586    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1587    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1588        let mut param_names = HashSet::new();
1589        self.apply_with_subqueries(|plan| {
1590            plan.apply_expressions(|expr| {
1591                expr.apply(|expr| {
1592                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1593                        param_names.insert(id.clone());
1594                    }
1595                    Ok(TreeNodeRecursion::Continue)
1596                })
1597            })
1598        })
1599        .map(|_| param_names)
1600    }
1601
1602    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1603    ///
1604    /// Note that this will drop any extension or field metadata attached to parameters. Use
1605    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1606    pub fn get_parameter_types(
1607        &self,
1608    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1609        let mut parameter_fields = self.get_parameter_fields()?;
1610        Ok(parameter_fields
1611            .drain()
1612            .map(|(name, maybe_field)| {
1613                (name, maybe_field.map(|field| field.data_type().clone()))
1614            })
1615            .collect())
1616    }
1617
1618    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1619    pub fn get_parameter_fields(
1620        &self,
1621    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1622        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1623
1624        self.apply_with_subqueries(|plan| {
1625            plan.apply_expressions(|expr| {
1626                expr.apply(|expr| {
1627                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1628                        let prev = param_types.get(id);
1629                        match (prev, field) {
1630                            (Some(Some(prev)), Some(field)) => {
1631                                check_metadata_with_storage_equal(
1632                                    (field.data_type(), Some(field.metadata())),
1633                                    (prev.data_type(), Some(prev.metadata())),
1634                                    "parameter",
1635                                    &format!(": Conflicting types for id {id}"),
1636                                )?;
1637                            }
1638                            (_, Some(field)) => {
1639                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1640                            }
1641                            _ => {
1642                                param_types.insert(id.clone(), None);
1643                            }
1644                        }
1645                    }
1646                    Ok(TreeNodeRecursion::Continue)
1647                })
1648            })
1649        })
1650        .map(|_| param_types)
1651    }
1652
1653    // ------------
1654    // Various implementations for printing out LogicalPlans
1655    // ------------
1656
1657    /// Return a `format`able structure that produces a single line
1658    /// per node.
1659    ///
1660    /// # Example
1661    ///
1662    /// ```text
1663    /// Projection: employee.id
1664    ///    Filter: employee.state Eq Utf8(\"CO\")\
1665    ///       CsvScan: employee projection=Some([0, 3])
1666    /// ```
1667    ///
1668    /// ```
1669    /// use arrow::datatypes::{DataType, Field, Schema};
1670    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1671    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1672    /// let plan = table_scan(Some("t1"), &schema, None)
1673    ///     .unwrap()
1674    ///     .filter(col("id").eq(lit(5)))
1675    ///     .unwrap()
1676    ///     .build()
1677    ///     .unwrap();
1678    ///
1679    /// // Format using display_indent
1680    /// let display_string = format!("{}", plan.display_indent());
1681    ///
1682    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1683    /// ```
1684    pub fn display_indent(&self) -> impl Display + '_ {
1685        // Boilerplate structure to wrap LogicalPlan with something
1686        // that that can be formatted
1687        struct Wrapper<'a>(&'a LogicalPlan);
1688        impl Display for Wrapper<'_> {
1689            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1690                let with_schema = false;
1691                let mut visitor = IndentVisitor::new(f, with_schema);
1692                match self.0.visit_with_subqueries(&mut visitor) {
1693                    Ok(_) => Ok(()),
1694                    Err(_) => Err(fmt::Error),
1695                }
1696            }
1697        }
1698        Wrapper(self)
1699    }
1700
1701    /// Return a `format`able structure that produces a single line
1702    /// per node that includes the output schema. For example:
1703    ///
1704    /// ```text
1705    /// Projection: employee.id [id:Int32]\
1706    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1707    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1708    /// ```
1709    ///
1710    /// ```
1711    /// use arrow::datatypes::{DataType, Field, Schema};
1712    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1713    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1714    /// let plan = table_scan(Some("t1"), &schema, None)
1715    ///     .unwrap()
1716    ///     .filter(col("id").eq(lit(5)))
1717    ///     .unwrap()
1718    ///     .build()
1719    ///     .unwrap();
1720    ///
1721    /// // Format using display_indent_schema
1722    /// let display_string = format!("{}", plan.display_indent_schema());
1723    ///
1724    /// assert_eq!(
1725    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1726    ///             \n  TableScan: t1 [id:Int32]",
1727    ///     display_string
1728    /// );
1729    /// ```
1730    pub fn display_indent_schema(&self) -> impl Display + '_ {
1731        // Boilerplate structure to wrap LogicalPlan with something
1732        // that that can be formatted
1733        struct Wrapper<'a>(&'a LogicalPlan);
1734        impl Display for Wrapper<'_> {
1735            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1736                let with_schema = true;
1737                let mut visitor = IndentVisitor::new(f, with_schema);
1738                match self.0.visit_with_subqueries(&mut visitor) {
1739                    Ok(_) => Ok(()),
1740                    Err(_) => Err(fmt::Error),
1741                }
1742            }
1743        }
1744        Wrapper(self)
1745    }
1746
1747    /// Return a displayable structure that produces plan in postgresql JSON format.
1748    ///
1749    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1750    pub fn display_pg_json(&self) -> impl Display + '_ {
1751        // Boilerplate structure to wrap LogicalPlan with something
1752        // that that can be formatted
1753        struct Wrapper<'a>(&'a LogicalPlan);
1754        impl Display for Wrapper<'_> {
1755            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1756                let mut visitor = PgJsonVisitor::new(f);
1757                visitor.with_schema(true);
1758                match self.0.visit_with_subqueries(&mut visitor) {
1759                    Ok(_) => Ok(()),
1760                    Err(_) => Err(fmt::Error),
1761                }
1762            }
1763        }
1764        Wrapper(self)
1765    }
1766
1767    /// Return a `format`able structure that produces lines meant for
1768    /// graphical display using the `DOT` language. This format can be
1769    /// visualized using software from
1770    /// [`graphviz`](https://graphviz.org/)
1771    ///
1772    /// This currently produces two graphs -- one with the basic
1773    /// structure, and one with additional details such as schema.
1774    ///
1775    /// ```
1776    /// use arrow::datatypes::{DataType, Field, Schema};
1777    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1778    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1779    /// let plan = table_scan(Some("t1"), &schema, None)
1780    ///     .unwrap()
1781    ///     .filter(col("id").eq(lit(5)))
1782    ///     .unwrap()
1783    ///     .build()
1784    ///     .unwrap();
1785    ///
1786    /// // Format using display_graphviz
1787    /// let graphviz_string = format!("{}", plan.display_graphviz());
1788    /// ```
1789    ///
1790    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1791    /// commands can be used to render it as a pdf:
1792    ///
1793    /// ```bash
1794    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1795    /// ```
1796    pub fn display_graphviz(&self) -> impl Display + '_ {
1797        // Boilerplate structure to wrap LogicalPlan with something
1798        // that that can be formatted
1799        struct Wrapper<'a>(&'a LogicalPlan);
1800        impl Display for Wrapper<'_> {
1801            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1802                let mut visitor = GraphvizVisitor::new(f);
1803
1804                visitor.start_graph()?;
1805
1806                visitor.pre_visit_plan("LogicalPlan")?;
1807                self.0
1808                    .visit_with_subqueries(&mut visitor)
1809                    .map_err(|_| fmt::Error)?;
1810                visitor.post_visit_plan()?;
1811
1812                visitor.set_with_schema(true);
1813                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1814                self.0
1815                    .visit_with_subqueries(&mut visitor)
1816                    .map_err(|_| fmt::Error)?;
1817                visitor.post_visit_plan()?;
1818
1819                visitor.end_graph()?;
1820                Ok(())
1821            }
1822        }
1823        Wrapper(self)
1824    }
1825
1826    /// Return a `format`able structure with the a human readable
1827    /// description of this LogicalPlan node per node, not including
1828    /// children. For example:
1829    ///
1830    /// ```text
1831    /// Projection: id
1832    /// ```
1833    /// ```
1834    /// use arrow::datatypes::{DataType, Field, Schema};
1835    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1836    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1837    /// let plan = table_scan(Some("t1"), &schema, None)
1838    ///     .unwrap()
1839    ///     .build()
1840    ///     .unwrap();
1841    ///
1842    /// // Format using display
1843    /// let display_string = format!("{}", plan.display());
1844    ///
1845    /// assert_eq!("TableScan: t1", display_string);
1846    /// ```
1847    pub fn display(&self) -> impl Display + '_ {
1848        // Boilerplate structure to wrap LogicalPlan with something
1849        // that that can be formatted
1850        struct Wrapper<'a>(&'a LogicalPlan);
1851        impl Display for Wrapper<'_> {
1852            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1853                match self.0 {
1854                    LogicalPlan::EmptyRelation(EmptyRelation {
1855                        produce_one_row,
1856                        schema: _,
1857                    }) => {
1858                        let rows = if *produce_one_row { 1 } else { 0 };
1859                        write!(f, "EmptyRelation: rows={rows}")
1860                    }
1861                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1862                        is_distinct, ..
1863                    }) => {
1864                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1865                    }
1866                    LogicalPlan::Values(Values { values, .. }) => {
1867                        let str_values: Vec<_> = values
1868                            .iter()
1869                            // limit to only 5 values to avoid horrible display
1870                            .take(5)
1871                            .map(|row| {
1872                                let item = row
1873                                    .iter()
1874                                    .map(|expr| expr.to_string())
1875                                    .collect::<Vec<_>>()
1876                                    .join(", ");
1877                                format!("({item})")
1878                            })
1879                            .collect();
1880
1881                        let eclipse = if values.len() > 5 { "..." } else { "" };
1882                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1883                    }
1884
1885                    LogicalPlan::TableScan(TableScan {
1886                        source,
1887                        table_name,
1888                        projection,
1889                        filters,
1890                        fetch,
1891                        ..
1892                    }) => {
1893                        let projected_fields = match projection {
1894                            Some(indices) => {
1895                                let schema = source.schema();
1896                                let names: Vec<&str> = indices
1897                                    .iter()
1898                                    .map(|i| schema.field(*i).name().as_str())
1899                                    .collect();
1900                                format!(" projection=[{}]", names.join(", "))
1901                            }
1902                            _ => "".to_string(),
1903                        };
1904
1905                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1906
1907                        if !filters.is_empty() {
1908                            let mut full_filter = vec![];
1909                            let mut partial_filter = vec![];
1910                            let mut unsupported_filters = vec![];
1911                            let filters: Vec<&Expr> = filters.iter().collect();
1912
1913                            if let Ok(results) =
1914                                source.supports_filters_pushdown(&filters)
1915                            {
1916                                filters.iter().zip(results.iter()).for_each(
1917                                    |(x, res)| match res {
1918                                        TableProviderFilterPushDown::Exact => {
1919                                            full_filter.push(x)
1920                                        }
1921                                        TableProviderFilterPushDown::Inexact => {
1922                                            partial_filter.push(x)
1923                                        }
1924                                        TableProviderFilterPushDown::Unsupported => {
1925                                            unsupported_filters.push(x)
1926                                        }
1927                                    },
1928                                );
1929                            }
1930
1931                            if !full_filter.is_empty() {
1932                                write!(
1933                                    f,
1934                                    ", full_filters=[{}]",
1935                                    expr_vec_fmt!(full_filter)
1936                                )?;
1937                            };
1938                            if !partial_filter.is_empty() {
1939                                write!(
1940                                    f,
1941                                    ", partial_filters=[{}]",
1942                                    expr_vec_fmt!(partial_filter)
1943                                )?;
1944                            }
1945                            if !unsupported_filters.is_empty() {
1946                                write!(
1947                                    f,
1948                                    ", unsupported_filters=[{}]",
1949                                    expr_vec_fmt!(unsupported_filters)
1950                                )?;
1951                            }
1952                        }
1953
1954                        if let Some(n) = fetch {
1955                            write!(f, ", fetch={n}")?;
1956                        }
1957
1958                        Ok(())
1959                    }
1960                    LogicalPlan::Projection(Projection { expr, .. }) => {
1961                        write!(f, "Projection:")?;
1962                        for (i, expr_item) in expr.iter().enumerate() {
1963                            if i > 0 {
1964                                write!(f, ",")?;
1965                            }
1966                            write!(f, " {expr_item}")?;
1967                        }
1968                        Ok(())
1969                    }
1970                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1971                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1972                    }
1973                    LogicalPlan::Copy(CopyTo {
1974                        input: _,
1975                        output_url,
1976                        file_type,
1977                        options,
1978                        ..
1979                    }) => {
1980                        let op_str = options
1981                            .iter()
1982                            .map(|(k, v)| format!("{k} {v}"))
1983                            .collect::<Vec<String>>()
1984                            .join(", ");
1985
1986                        write!(
1987                            f,
1988                            "CopyTo: format={} output_url={output_url} options: ({op_str})",
1989                            file_type.get_ext()
1990                        )
1991                    }
1992                    LogicalPlan::Ddl(ddl) => {
1993                        write!(f, "{}", ddl.display())
1994                    }
1995                    LogicalPlan::Filter(Filter {
1996                        predicate: expr, ..
1997                    }) => write!(f, "Filter: {expr}"),
1998                    LogicalPlan::Window(Window { window_expr, .. }) => {
1999                        write!(
2000                            f,
2001                            "WindowAggr: windowExpr=[[{}]]",
2002                            expr_vec_fmt!(window_expr)
2003                        )
2004                    }
2005                    LogicalPlan::Aggregate(Aggregate {
2006                        group_expr,
2007                        aggr_expr,
2008                        ..
2009                    }) => write!(
2010                        f,
2011                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2012                        expr_vec_fmt!(group_expr),
2013                        expr_vec_fmt!(aggr_expr)
2014                    ),
2015                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2016                        write!(f, "Sort: ")?;
2017                        for (i, expr_item) in expr.iter().enumerate() {
2018                            if i > 0 {
2019                                write!(f, ", ")?;
2020                            }
2021                            write!(f, "{expr_item}")?;
2022                        }
2023                        if let Some(a) = fetch {
2024                            write!(f, ", fetch={a}")?;
2025                        }
2026
2027                        Ok(())
2028                    }
2029                    LogicalPlan::Join(Join {
2030                        on: keys,
2031                        filter,
2032                        join_constraint,
2033                        join_type,
2034                        ..
2035                    }) => {
2036                        let join_expr: Vec<String> =
2037                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2038                        let filter_expr = filter
2039                            .as_ref()
2040                            .map(|expr| format!(" Filter: {expr}"))
2041                            .unwrap_or_else(|| "".to_string());
2042                        let join_type = if filter.is_none()
2043                            && keys.is_empty()
2044                            && *join_type == JoinType::Inner
2045                        {
2046                            "Cross".to_string()
2047                        } else {
2048                            join_type.to_string()
2049                        };
2050                        match join_constraint {
2051                            JoinConstraint::On => {
2052                                write!(f, "{join_type} Join:",)?;
2053                                if !join_expr.is_empty() || !filter_expr.is_empty() {
2054                                    write!(
2055                                        f,
2056                                        " {}{}",
2057                                        join_expr.join(", "),
2058                                        filter_expr
2059                                    )?;
2060                                }
2061                                Ok(())
2062                            }
2063                            JoinConstraint::Using => {
2064                                write!(
2065                                    f,
2066                                    "{} Join: Using {}{}",
2067                                    join_type,
2068                                    join_expr.join(", "),
2069                                    filter_expr,
2070                                )
2071                            }
2072                        }
2073                    }
2074                    LogicalPlan::Repartition(Repartition {
2075                        partitioning_scheme,
2076                        ..
2077                    }) => match partitioning_scheme {
2078                        Partitioning::RoundRobinBatch(n) => {
2079                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2080                        }
2081                        Partitioning::Hash(expr, n) => {
2082                            let hash_expr: Vec<String> =
2083                                expr.iter().map(|e| format!("{e}")).collect();
2084                            write!(
2085                                f,
2086                                "Repartition: Hash({}) partition_count={}",
2087                                hash_expr.join(", "),
2088                                n
2089                            )
2090                        }
2091                        Partitioning::DistributeBy(expr) => {
2092                            let dist_by_expr: Vec<String> =
2093                                expr.iter().map(|e| format!("{e}")).collect();
2094                            write!(
2095                                f,
2096                                "Repartition: DistributeBy({})",
2097                                dist_by_expr.join(", "),
2098                            )
2099                        }
2100                    },
2101                    LogicalPlan::Limit(limit) => {
2102                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
2103                        let skip_str = match limit.get_skip_type() {
2104                            Ok(SkipType::Literal(n)) => n.to_string(),
2105                            _ => limit
2106                                .skip
2107                                .as_ref()
2108                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2109                        };
2110                        let fetch_str = match limit.get_fetch_type() {
2111                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
2112                            Ok(FetchType::Literal(None)) => "None".to_string(),
2113                            _ => limit
2114                                .fetch
2115                                .as_ref()
2116                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2117                        };
2118                        write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2119                    }
2120                    LogicalPlan::Subquery(Subquery { .. }) => {
2121                        write!(f, "Subquery:")
2122                    }
2123                    LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2124                        write!(f, "SubqueryAlias: {alias}")
2125                    }
2126                    LogicalPlan::Statement(statement) => {
2127                        write!(f, "{}", statement.display())
2128                    }
2129                    LogicalPlan::Distinct(distinct) => match distinct {
2130                        Distinct::All(_) => write!(f, "Distinct:"),
2131                        Distinct::On(DistinctOn {
2132                            on_expr,
2133                            select_expr,
2134                            sort_expr,
2135                            ..
2136                        }) => write!(
2137                            f,
2138                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2139                            expr_vec_fmt!(on_expr),
2140                            expr_vec_fmt!(select_expr),
2141                            if let Some(sort_expr) = sort_expr {
2142                                expr_vec_fmt!(sort_expr)
2143                            } else {
2144                                "".to_string()
2145                            },
2146                        ),
2147                    },
2148                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2149                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2150                    LogicalPlan::Union(_) => write!(f, "Union"),
2151                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2152                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2153                        write!(f, "DescribeTable")
2154                    }
2155                    LogicalPlan::Unnest(Unnest {
2156                        input: plan,
2157                        list_type_columns: list_col_indices,
2158                        struct_type_columns: struct_col_indices,
2159                        ..
2160                    }) => {
2161                        let input_columns = plan.schema().columns();
2162                        let list_type_columns = list_col_indices
2163                            .iter()
2164                            .map(|(i, unnest_info)| {
2165                                format!(
2166                                    "{}|depth={}",
2167                                    &input_columns[*i].to_string(),
2168                                    unnest_info.depth
2169                                )
2170                            })
2171                            .collect::<Vec<String>>();
2172                        let struct_type_columns = struct_col_indices
2173                            .iter()
2174                            .map(|i| &input_columns[*i])
2175                            .collect::<Vec<&Column>>();
2176                        // get items from input_columns indexed by list_col_indices
2177                        write!(
2178                            f,
2179                            "Unnest: lists[{}] structs[{}]",
2180                            expr_vec_fmt!(list_type_columns),
2181                            expr_vec_fmt!(struct_type_columns)
2182                        )
2183                    }
2184                }
2185            }
2186        }
2187        Wrapper(self)
2188    }
2189}
2190
2191impl Display for LogicalPlan {
2192    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2193        self.display_indent().fmt(f)
2194    }
2195}
2196
2197impl ToStringifiedPlan for LogicalPlan {
2198    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2199        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2200    }
2201}
2202
2203/// Relationship produces 0 or 1 placeholder rows with specified output schema
2204/// In most cases the output schema for `EmptyRelation` would be empty,
2205/// however, it can be non-empty typically for optimizer rules
2206#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2207pub struct EmptyRelation {
2208    /// Whether to produce a placeholder row
2209    pub produce_one_row: bool,
2210    /// The schema description of the output
2211    pub schema: DFSchemaRef,
2212}
2213
2214// Manual implementation needed because of `schema` field. Comparison excludes this field.
2215impl PartialOrd for EmptyRelation {
2216    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2217        self.produce_one_row
2218            .partial_cmp(&other.produce_one_row)
2219            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2220            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2221    }
2222}
2223
2224/// A variadic query operation, Recursive CTE.
2225///
2226/// # Recursive Query Evaluation
2227///
2228/// From the [Postgres Docs]:
2229///
2230/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2231///    discard duplicate rows. Include all remaining rows in the result of the
2232///    recursive query, and also place them in a temporary working table.
2233///
2234/// 2. So long as the working table is not empty, repeat these steps:
2235///
2236/// * Evaluate the recursive term, substituting the current contents of the
2237///   working table for the recursive self-reference. For `UNION` (but not `UNION
2238///   ALL`), discard duplicate rows and rows that duplicate any previous result
2239///   row. Include all remaining rows in the result of the recursive query, and
2240///   also place them in a temporary intermediate table.
2241///
2242/// * Replace the contents of the working table with the contents of the
2243///   intermediate table, then empty the intermediate table.
2244///
2245/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2246#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2247pub struct RecursiveQuery {
2248    /// Name of the query
2249    pub name: String,
2250    /// The static term (initial contents of the working table)
2251    pub static_term: Arc<LogicalPlan>,
2252    /// The recursive term (evaluated on the contents of the working table until
2253    /// it returns an empty set)
2254    pub recursive_term: Arc<LogicalPlan>,
2255    /// Should the output of the recursive term be deduplicated (`UNION`) or
2256    /// not (`UNION ALL`).
2257    pub is_distinct: bool,
2258}
2259
2260/// Values expression. See
2261/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2262/// documentation for more details.
2263#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2264pub struct Values {
2265    /// The table schema
2266    pub schema: DFSchemaRef,
2267    /// Values
2268    pub values: Vec<Vec<Expr>>,
2269}
2270
2271// Manual implementation needed because of `schema` field. Comparison excludes this field.
2272impl PartialOrd for Values {
2273    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2274        self.values
2275            .partial_cmp(&other.values)
2276            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2277            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2278    }
2279}
2280
2281/// Evaluates an arbitrary list of expressions (essentially a
2282/// SELECT with an expression list) on its input.
2283#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2284// mark non_exhaustive to encourage use of try_new/new()
2285#[non_exhaustive]
2286pub struct Projection {
2287    /// The list of expressions
2288    pub expr: Vec<Expr>,
2289    /// The incoming logical plan
2290    pub input: Arc<LogicalPlan>,
2291    /// The schema description of the output
2292    pub schema: DFSchemaRef,
2293}
2294
2295// Manual implementation needed because of `schema` field. Comparison excludes this field.
2296impl PartialOrd for Projection {
2297    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2298        match self.expr.partial_cmp(&other.expr) {
2299            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2300            cmp => cmp,
2301        }
2302        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2303        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2304    }
2305}
2306
2307impl Projection {
2308    /// Create a new Projection
2309    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2310        let projection_schema = projection_schema(&input, &expr)?;
2311        Self::try_new_with_schema(expr, input, projection_schema)
2312    }
2313
2314    /// Create a new Projection using the specified output schema
2315    pub fn try_new_with_schema(
2316        expr: Vec<Expr>,
2317        input: Arc<LogicalPlan>,
2318        schema: DFSchemaRef,
2319    ) -> Result<Self> {
2320        #[expect(deprecated)]
2321        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2322            && expr.len() != schema.fields().len()
2323        {
2324            return plan_err!(
2325                "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2326                expr.len(),
2327                schema.fields().len()
2328            );
2329        }
2330        Ok(Self {
2331            expr,
2332            input,
2333            schema,
2334        })
2335    }
2336
2337    /// Create a new Projection using the specified output schema
2338    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2339        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2340        Self {
2341            expr,
2342            input,
2343            schema,
2344        }
2345    }
2346}
2347
2348/// Computes the schema of the result produced by applying a projection to the input logical plan.
2349///
2350/// # Arguments
2351///
2352/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2353///   will be computed.
2354/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2355///
2356/// # Metadata Handling
2357///
2358/// - **Schema-level metadata**: Passed through unchanged from the input schema
2359/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2360///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2361///
2362/// # Returns
2363///
2364/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2365/// produced by the projection operation. If the schema computation is successful,
2366/// the `Result` will contain the schema; otherwise, it will contain an error.
2367pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2368    // Preserve input schema metadata at the schema level
2369    let metadata = input.schema().metadata().clone();
2370
2371    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2372    let schema =
2373        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2374            .with_functional_dependencies(calc_func_dependencies_for_project(
2375                exprs, input,
2376            )?)?;
2377
2378    Ok(Arc::new(schema))
2379}
2380
2381/// Aliased subquery
2382#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2383// mark non_exhaustive to encourage use of try_new/new()
2384#[non_exhaustive]
2385pub struct SubqueryAlias {
2386    /// The incoming logical plan
2387    pub input: Arc<LogicalPlan>,
2388    /// The alias for the input relation
2389    pub alias: TableReference,
2390    /// The schema with qualified field names
2391    pub schema: DFSchemaRef,
2392}
2393
2394impl SubqueryAlias {
2395    pub fn try_new(
2396        plan: Arc<LogicalPlan>,
2397        alias: impl Into<TableReference>,
2398    ) -> Result<Self> {
2399        let alias = alias.into();
2400
2401        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2402        // no field must share the same column name as this would lead to ambiguity when referencing
2403        // columns in parent logical nodes.
2404
2405        // Compute unique aliases, if any, for each column of the input's schema.
2406        let aliases = unique_field_aliases(plan.schema().fields());
2407        let is_projection_needed = aliases.iter().any(Option::is_some);
2408
2409        // Insert a projection node, if needed, to make sure aliases are applied.
2410        let plan = if is_projection_needed {
2411            let projection_expressions = aliases
2412                .iter()
2413                .zip(plan.schema().iter())
2414                .map(|(alias, (qualifier, field))| {
2415                    let column =
2416                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2417                    match alias {
2418                        None => column,
2419                        Some(alias) => {
2420                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2421                        }
2422                    }
2423                })
2424                .collect();
2425            let projection = Projection::try_new(projection_expressions, plan)?;
2426            Arc::new(LogicalPlan::Projection(projection))
2427        } else {
2428            plan
2429        };
2430
2431        // Requalify fields with the new `alias`.
2432        let fields = plan.schema().fields().clone();
2433        let meta_data = plan.schema().metadata().clone();
2434        let func_dependencies = plan.schema().functional_dependencies().clone();
2435
2436        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2437        let schema = schema.as_arrow();
2438
2439        let schema = DFSchemaRef::new(
2440            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2441                .with_functional_dependencies(func_dependencies)?,
2442        );
2443        Ok(SubqueryAlias {
2444            input: plan,
2445            alias,
2446            schema,
2447        })
2448    }
2449}
2450
2451// Manual implementation needed because of `schema` field. Comparison excludes this field.
2452impl PartialOrd for SubqueryAlias {
2453    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2454        match self.input.partial_cmp(&other.input) {
2455            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2456            cmp => cmp,
2457        }
2458        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2459        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2460    }
2461}
2462
2463/// Filters rows from its input that do not match an
2464/// expression (essentially a WHERE clause with a predicate
2465/// expression).
2466///
2467/// Semantically, `<predicate>` is evaluated for each row of the input;
2468/// If the value of `<predicate>` is true, the input row is passed to
2469/// the output. If the value of `<predicate>` is false, the row is
2470/// discarded.
2471///
2472/// Filter should not be created directly but instead use `try_new()`
2473/// and that these fields are only pub to support pattern matching
2474#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2475#[non_exhaustive]
2476pub struct Filter {
2477    /// The predicate expression, which must have Boolean type.
2478    pub predicate: Expr,
2479    /// The incoming logical plan
2480    pub input: Arc<LogicalPlan>,
2481}
2482
2483impl Filter {
2484    /// Create a new filter operator.
2485    ///
2486    /// Notes: as Aliases have no effect on the output of a filter operator,
2487    /// they are removed from the predicate expression.
2488    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2489        Self::try_new_internal(predicate, input)
2490    }
2491
2492    /// Create a new filter operator for a having clause.
2493    /// This is similar to a filter, but its having flag is set to true.
2494    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2495    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2496        Self::try_new_internal(predicate, input)
2497    }
2498
2499    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2500        match data_type {
2501            // Interpret NULL as a missing boolean value.
2502            DataType::Boolean | DataType::Null => true,
2503            DataType::Dictionary(_, value_type) => {
2504                Filter::is_allowed_filter_type(value_type.as_ref())
2505            }
2506            _ => false,
2507        }
2508    }
2509
2510    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2511        // Filter predicates must return a boolean value so we try and validate that here.
2512        // Note that it is not always possible to resolve the predicate expression during plan
2513        // construction (such as with correlated subqueries) so we make a best effort here and
2514        // ignore errors resolving the expression against the schema.
2515        if let Ok(predicate_type) = predicate.get_type(input.schema())
2516            && !Filter::is_allowed_filter_type(&predicate_type)
2517        {
2518            return plan_err!(
2519                "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2520            );
2521        }
2522
2523        Ok(Self {
2524            predicate: predicate.unalias_nested().data,
2525            input,
2526        })
2527    }
2528
2529    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2530    ///
2531    /// This function will return `true` if its predicate contains a conjunction of
2532    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2533    /// by this conjunction.
2534    ///
2535    /// For example, for the table:
2536    /// ```sql
2537    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2538    /// ```
2539    /// `Filter(a = 2).is_scalar() == true`
2540    /// , whereas
2541    /// `Filter(b = 2).is_scalar() == false`
2542    /// and
2543    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2544    fn is_scalar(&self) -> bool {
2545        let schema = self.input.schema();
2546
2547        let functional_dependencies = self.input.schema().functional_dependencies();
2548        let unique_keys = functional_dependencies.iter().filter(|dep| {
2549            let nullable = dep.nullable
2550                && dep
2551                    .source_indices
2552                    .iter()
2553                    .any(|&source| schema.field(source).is_nullable());
2554            !nullable
2555                && dep.mode == Dependency::Single
2556                && dep.target_indices.len() == schema.fields().len()
2557        });
2558
2559        let exprs = split_conjunction(&self.predicate);
2560        let eq_pred_cols: HashSet<_> = exprs
2561            .iter()
2562            .filter_map(|expr| {
2563                let Expr::BinaryExpr(BinaryExpr {
2564                    left,
2565                    op: Operator::Eq,
2566                    right,
2567                }) = expr
2568                else {
2569                    return None;
2570                };
2571                // This is a no-op filter expression
2572                if left == right {
2573                    return None;
2574                }
2575
2576                match (left.as_ref(), right.as_ref()) {
2577                    (Expr::Column(_), Expr::Column(_)) => None,
2578                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2579                        Some(schema.index_of_column(c).unwrap())
2580                    }
2581                    _ => None,
2582                }
2583            })
2584            .collect();
2585
2586        // If we have a functional dependence that is a subset of our predicate,
2587        // this filter is scalar
2588        for key in unique_keys {
2589            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2590                return true;
2591            }
2592        }
2593        false
2594    }
2595}
2596
2597/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2598///
2599/// # Output Schema
2600///
2601/// The output schema is the input schema followed by the window function
2602/// expressions, in order.
2603///
2604/// For example, given the input schema `"A", "B", "C"` and the window function
2605/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2606/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2607/// output column.
2608///
2609/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2610/// schema.
2611#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2612pub struct Window {
2613    /// The incoming logical plan
2614    pub input: Arc<LogicalPlan>,
2615    /// The window function expression
2616    pub window_expr: Vec<Expr>,
2617    /// The schema description of the window output
2618    pub schema: DFSchemaRef,
2619}
2620
2621impl Window {
2622    /// Create a new window operator.
2623    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2624        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2625            .schema()
2626            .iter()
2627            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2628            .collect();
2629        let input_len = fields.len();
2630        let mut window_fields = fields;
2631        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2632        window_fields.extend_from_slice(expr_fields.as_slice());
2633        let metadata = input.schema().metadata().clone();
2634
2635        // Update functional dependencies for window:
2636        let mut window_func_dependencies =
2637            input.schema().functional_dependencies().clone();
2638        window_func_dependencies.extend_target_indices(window_fields.len());
2639
2640        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2641        // of consecutive numbers per partition), we can represent this fact with
2642        // functional dependencies.
2643        let mut new_dependencies = window_expr
2644            .iter()
2645            .enumerate()
2646            .filter_map(|(idx, expr)| {
2647                let Expr::WindowFunction(window_fun) = expr else {
2648                    return None;
2649                };
2650                let WindowFunction {
2651                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2652                    params: WindowFunctionParams { partition_by, .. },
2653                } = window_fun.as_ref()
2654                else {
2655                    return None;
2656                };
2657                // When there is no PARTITION BY, row number will be unique
2658                // across the entire table.
2659                if udwf.name() == "row_number" && partition_by.is_empty() {
2660                    Some(idx + input_len)
2661                } else {
2662                    None
2663                }
2664            })
2665            .map(|idx| {
2666                FunctionalDependence::new(vec![idx], vec![], false)
2667                    .with_mode(Dependency::Single)
2668            })
2669            .collect::<Vec<_>>();
2670
2671        if !new_dependencies.is_empty() {
2672            for dependence in new_dependencies.iter_mut() {
2673                dependence.target_indices = (0..window_fields.len()).collect();
2674            }
2675            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2676            let new_deps = FunctionalDependencies::new(new_dependencies);
2677            window_func_dependencies.extend(new_deps);
2678        }
2679
2680        // Validate that FILTER clauses are only used with aggregate window functions
2681        if let Some(e) = window_expr.iter().find(|e| {
2682            matches!(
2683                e,
2684                Expr::WindowFunction(wf)
2685                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2686                        && wf.params.filter.is_some()
2687            )
2688        }) {
2689            return plan_err!(
2690                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2691            );
2692        }
2693
2694        Self::try_new_with_schema(
2695            window_expr,
2696            input,
2697            Arc::new(
2698                DFSchema::new_with_metadata(window_fields, metadata)?
2699                    .with_functional_dependencies(window_func_dependencies)?,
2700            ),
2701        )
2702    }
2703
2704    /// Create a new window function using the provided schema to avoid the overhead of
2705    /// building the schema again when the schema is already known.
2706    ///
2707    /// This method should only be called when you are absolutely sure that the schema being
2708    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2709    pub fn try_new_with_schema(
2710        window_expr: Vec<Expr>,
2711        input: Arc<LogicalPlan>,
2712        schema: DFSchemaRef,
2713    ) -> Result<Self> {
2714        let input_fields_count = input.schema().fields().len();
2715        if schema.fields().len() != input_fields_count + window_expr.len() {
2716            return plan_err!(
2717                "Window schema has wrong number of fields. Expected {} got {}",
2718                input_fields_count + window_expr.len(),
2719                schema.fields().len()
2720            );
2721        }
2722
2723        Ok(Window {
2724            input,
2725            window_expr,
2726            schema,
2727        })
2728    }
2729}
2730
2731// Manual implementation needed because of `schema` field. Comparison excludes this field.
2732impl PartialOrd for Window {
2733    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2734        match self.input.partial_cmp(&other.input)? {
2735            Ordering::Equal => {} // continue
2736            not_equal => return Some(not_equal),
2737        }
2738
2739        match self.window_expr.partial_cmp(&other.window_expr)? {
2740            Ordering::Equal => {} // continue
2741            not_equal => return Some(not_equal),
2742        }
2743
2744        // Contract for PartialOrd and PartialEq consistency requires that
2745        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2746        if self == other {
2747            Some(Ordering::Equal)
2748        } else {
2749            None
2750        }
2751    }
2752}
2753
2754/// Produces rows from a table provider by reference or from the context
2755#[derive(Clone)]
2756pub struct TableScan {
2757    /// The name of the table
2758    pub table_name: TableReference,
2759    /// The source of the table
2760    pub source: Arc<dyn TableSource>,
2761    /// Optional column indices to use as a projection
2762    pub projection: Option<Vec<usize>>,
2763    /// The schema description of the output
2764    pub projected_schema: DFSchemaRef,
2765    /// Optional expressions to be used as filters by the table provider
2766    pub filters: Vec<Expr>,
2767    /// Optional number of rows to read
2768    pub fetch: Option<usize>,
2769}
2770
2771impl Debug for TableScan {
2772    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2773        f.debug_struct("TableScan")
2774            .field("table_name", &self.table_name)
2775            .field("source", &"...")
2776            .field("projection", &self.projection)
2777            .field("projected_schema", &self.projected_schema)
2778            .field("filters", &self.filters)
2779            .field("fetch", &self.fetch)
2780            .finish_non_exhaustive()
2781    }
2782}
2783
2784impl PartialEq for TableScan {
2785    fn eq(&self, other: &Self) -> bool {
2786        self.table_name == other.table_name
2787            && self.projection == other.projection
2788            && self.projected_schema == other.projected_schema
2789            && self.filters == other.filters
2790            && self.fetch == other.fetch
2791    }
2792}
2793
2794impl Eq for TableScan {}
2795
2796// Manual implementation needed because of `source` and `projected_schema` fields.
2797// Comparison excludes these field.
2798impl PartialOrd for TableScan {
2799    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2800        #[derive(PartialEq, PartialOrd)]
2801        struct ComparableTableScan<'a> {
2802            /// The name of the table
2803            pub table_name: &'a TableReference,
2804            /// Optional column indices to use as a projection
2805            pub projection: &'a Option<Vec<usize>>,
2806            /// Optional expressions to be used as filters by the table provider
2807            pub filters: &'a Vec<Expr>,
2808            /// Optional number of rows to read
2809            pub fetch: &'a Option<usize>,
2810        }
2811        let comparable_self = ComparableTableScan {
2812            table_name: &self.table_name,
2813            projection: &self.projection,
2814            filters: &self.filters,
2815            fetch: &self.fetch,
2816        };
2817        let comparable_other = ComparableTableScan {
2818            table_name: &other.table_name,
2819            projection: &other.projection,
2820            filters: &other.filters,
2821            fetch: &other.fetch,
2822        };
2823        comparable_self
2824            .partial_cmp(&comparable_other)
2825            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2826            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2827    }
2828}
2829
2830impl Hash for TableScan {
2831    fn hash<H: Hasher>(&self, state: &mut H) {
2832        self.table_name.hash(state);
2833        self.projection.hash(state);
2834        self.projected_schema.hash(state);
2835        self.filters.hash(state);
2836        self.fetch.hash(state);
2837    }
2838}
2839
2840impl TableScan {
2841    /// Initialize TableScan with appropriate schema from the given
2842    /// arguments.
2843    pub fn try_new(
2844        table_name: impl Into<TableReference>,
2845        table_source: Arc<dyn TableSource>,
2846        projection: Option<Vec<usize>>,
2847        filters: Vec<Expr>,
2848        fetch: Option<usize>,
2849    ) -> Result<Self> {
2850        let table_name = table_name.into();
2851
2852        if table_name.table().is_empty() {
2853            return plan_err!("table_name cannot be empty");
2854        }
2855        let schema = table_source.schema();
2856        let func_dependencies = FunctionalDependencies::new_from_constraints(
2857            table_source.constraints(),
2858            schema.fields.len(),
2859        );
2860        let projected_schema = projection
2861            .as_ref()
2862            .map(|p| {
2863                let projected_func_dependencies =
2864                    func_dependencies.project_functional_dependencies(p, p.len());
2865
2866                let df_schema = DFSchema::new_with_metadata(
2867                    p.iter()
2868                        .map(|i| {
2869                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2870                        })
2871                        .collect(),
2872                    schema.metadata.clone(),
2873                )?;
2874                df_schema.with_functional_dependencies(projected_func_dependencies)
2875            })
2876            .unwrap_or_else(|| {
2877                let df_schema =
2878                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2879                df_schema.with_functional_dependencies(func_dependencies)
2880            })?;
2881        let projected_schema = Arc::new(projected_schema);
2882
2883        Ok(Self {
2884            table_name,
2885            source: table_source,
2886            projection,
2887            projected_schema,
2888            filters,
2889            fetch,
2890        })
2891    }
2892}
2893
2894// Repartition the plan based on a partitioning scheme.
2895#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2896pub struct Repartition {
2897    /// The incoming logical plan
2898    pub input: Arc<LogicalPlan>,
2899    /// The partitioning scheme
2900    pub partitioning_scheme: Partitioning,
2901}
2902
2903/// Union multiple inputs
2904#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2905pub struct Union {
2906    /// Inputs to merge
2907    pub inputs: Vec<Arc<LogicalPlan>>,
2908    /// Union schema. Should be the same for all inputs.
2909    pub schema: DFSchemaRef,
2910}
2911
2912impl Union {
2913    /// Constructs new Union instance deriving schema from inputs.
2914    /// Schema data types must match exactly.
2915    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2916        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2917        Ok(Union { inputs, schema })
2918    }
2919
2920    /// Constructs new Union instance deriving schema from inputs.
2921    /// Inputs do not have to have matching types and produced schema will
2922    /// take type from the first input.
2923    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2924    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2925        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2926        Ok(Union { inputs, schema })
2927    }
2928
2929    /// Constructs a new Union instance that combines rows from different tables by name,
2930    /// instead of by position. This means that the specified inputs need not have schemas
2931    /// that are all the same width.
2932    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2933        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2934        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2935
2936        Ok(Union { inputs, schema })
2937    }
2938
2939    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2940    /// in an additional `Projection` to account for absence of columns
2941    /// in input schemas or differing projection orders.
2942    fn rewrite_inputs_from_schema(
2943        schema: &Arc<DFSchema>,
2944        inputs: Vec<Arc<LogicalPlan>>,
2945    ) -> Result<Vec<Arc<LogicalPlan>>> {
2946        let schema_width = schema.iter().count();
2947        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2948        for input in inputs {
2949            // Any columns that exist within the derived schema but do not exist
2950            // within an input's schema should be replaced with `NULL` aliased
2951            // to the appropriate column in the derived schema.
2952            let mut expr = Vec::with_capacity(schema_width);
2953            for column in schema.columns() {
2954                if input
2955                    .schema()
2956                    .has_column_with_unqualified_name(column.name())
2957                {
2958                    expr.push(Expr::Column(column));
2959                } else {
2960                    expr.push(
2961                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2962                    );
2963                }
2964            }
2965            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2966                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2967            )));
2968        }
2969
2970        Ok(wrapped_inputs)
2971    }
2972
2973    /// Constructs new Union instance deriving schema from inputs.
2974    ///
2975    /// If `loose_types` is true, inputs do not need to have matching types and
2976    /// the produced schema will use the type from the first input.
2977    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2978    ///
2979    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2980    /// the constructed schema follows `UNION BY NAME` semantics.
2981    fn derive_schema_from_inputs(
2982        inputs: &[Arc<LogicalPlan>],
2983        loose_types: bool,
2984        by_name: bool,
2985    ) -> Result<DFSchemaRef> {
2986        if inputs.len() < 2 {
2987            return plan_err!("UNION requires at least two inputs");
2988        }
2989
2990        if by_name {
2991            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2992        } else {
2993            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2994        }
2995    }
2996
2997    fn derive_schema_from_inputs_by_name(
2998        inputs: &[Arc<LogicalPlan>],
2999        loose_types: bool,
3000    ) -> Result<DFSchemaRef> {
3001        type FieldData<'a> =
3002            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
3003        let mut cols: Vec<(&str, FieldData)> = Vec::new();
3004        for input in inputs.iter() {
3005            for field in input.schema().fields() {
3006                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3007                    cols.iter_mut().find(|(name, _)| name == field.name())
3008                {
3009                    if !loose_types && *data_type != field.data_type() {
3010                        return plan_err!(
3011                            "Found different types for field {}",
3012                            field.name()
3013                        );
3014                    }
3015
3016                    metadata.push(field.metadata());
3017                    // If the field is nullable in any one of the inputs,
3018                    // then the field in the final schema is also nullable.
3019                    *is_nullable |= field.is_nullable();
3020                    *occurrences += 1;
3021                } else {
3022                    cols.push((
3023                        field.name(),
3024                        (
3025                            field.data_type(),
3026                            field.is_nullable(),
3027                            vec![field.metadata()],
3028                            1,
3029                        ),
3030                    ));
3031                }
3032            }
3033        }
3034
3035        let union_fields = cols
3036            .into_iter()
3037            .map(
3038                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3039                    // If the final number of occurrences of the field is less
3040                    // than the number of inputs (i.e. the field is missing from
3041                    // one or more inputs), then it must be treated as nullable.
3042                    let final_is_nullable = if occurrences == inputs.len() {
3043                        is_nullable
3044                    } else {
3045                        true
3046                    };
3047
3048                    let mut field =
3049                        Field::new(name, data_type.clone(), final_is_nullable);
3050                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3051
3052                    (None, Arc::new(field))
3053                },
3054            )
3055            .collect::<Vec<(Option<TableReference>, _)>>();
3056
3057        let union_schema_metadata = intersect_metadata_for_union(
3058            inputs.iter().map(|input| input.schema().metadata()),
3059        );
3060
3061        // Functional Dependencies are not preserved after UNION operation
3062        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3063        let schema = Arc::new(schema);
3064
3065        Ok(schema)
3066    }
3067
3068    fn derive_schema_from_inputs_by_position(
3069        inputs: &[Arc<LogicalPlan>],
3070        loose_types: bool,
3071    ) -> Result<DFSchemaRef> {
3072        let first_schema = inputs[0].schema();
3073        let fields_count = first_schema.fields().len();
3074        for input in inputs.iter().skip(1) {
3075            if fields_count != input.schema().fields().len() {
3076                return plan_err!(
3077                    "UNION queries have different number of columns: \
3078                    left has {} columns whereas right has {} columns",
3079                    fields_count,
3080                    input.schema().fields().len()
3081                );
3082            }
3083        }
3084
3085        let mut name_counts: HashMap<String, usize> = HashMap::new();
3086        let union_fields = (0..fields_count)
3087            .map(|i| {
3088                let fields = inputs
3089                    .iter()
3090                    .map(|input| input.schema().field(i))
3091                    .collect::<Vec<_>>();
3092                let first_field = fields[0];
3093                let base_name = first_field.name().to_string();
3094
3095                let data_type = if loose_types {
3096                    // TODO apply type coercion here, or document why it's better to defer
3097                    // temporarily use the data type from the left input and later rely on the analyzer to
3098                    // coerce the two schemas into a common one.
3099                    first_field.data_type()
3100                } else {
3101                    fields.iter().skip(1).try_fold(
3102                        first_field.data_type(),
3103                        |acc, field| {
3104                            if acc != field.data_type() {
3105                                return plan_err!(
3106                                    "UNION field {i} have different type in inputs: \
3107                                    left has {} whereas right has {}",
3108                                    first_field.data_type(),
3109                                    field.data_type()
3110                                );
3111                            }
3112                            Ok(acc)
3113                        },
3114                    )?
3115                };
3116                let nullable = fields.iter().any(|field| field.is_nullable());
3117
3118                // Generate unique field name
3119                let name = if let Some(count) = name_counts.get_mut(&base_name) {
3120                    *count += 1;
3121                    format!("{base_name}_{count}")
3122                } else {
3123                    name_counts.insert(base_name.clone(), 0);
3124                    base_name
3125                };
3126
3127                let mut field = Field::new(&name, data_type.clone(), nullable);
3128                let field_metadata = intersect_metadata_for_union(
3129                    fields.iter().map(|field| field.metadata()),
3130                );
3131                field.set_metadata(field_metadata);
3132                Ok((None, Arc::new(field)))
3133            })
3134            .collect::<Result<_>>()?;
3135        let union_schema_metadata = intersect_metadata_for_union(
3136            inputs.iter().map(|input| input.schema().metadata()),
3137        );
3138
3139        // Functional Dependencies are not preserved after UNION operation
3140        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3141        let schema = Arc::new(schema);
3142
3143        Ok(schema)
3144    }
3145}
3146
3147// Manual implementation needed because of `schema` field. Comparison excludes this field.
3148impl PartialOrd for Union {
3149    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3150        self.inputs
3151            .partial_cmp(&other.inputs)
3152            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3153            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3154    }
3155}
3156
3157/// Describe the schema of table
3158///
3159/// # Example output:
3160///
3161/// ```sql
3162/// > describe traces;
3163/// +--------------------+-----------------------------+-------------+
3164/// | column_name        | data_type                   | is_nullable |
3165/// +--------------------+-----------------------------+-------------+
3166/// | attributes         | Utf8                        | YES         |
3167/// | duration_nano      | Int64                       | YES         |
3168/// | end_time_unix_nano | Int64                       | YES         |
3169/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3170/// | span.kind          | Utf8                        | YES         |
3171/// | span.name          | Utf8                        | YES         |
3172/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3173/// | time               | Timestamp(Nanosecond, None) | NO          |
3174/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3175/// | otel.status_code   | Utf8                        | YES         |
3176/// | parent_span_id     | Utf8                        | YES         |
3177/// +--------------------+-----------------------------+-------------+
3178/// ```
3179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3180pub struct DescribeTable {
3181    /// Table schema
3182    pub schema: Arc<Schema>,
3183    /// schema of describe table output
3184    pub output_schema: DFSchemaRef,
3185}
3186
3187// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3188// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3189impl PartialOrd for DescribeTable {
3190    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3191        // There is no relevant comparison for schemas
3192        None
3193    }
3194}
3195
3196/// Options for EXPLAIN
3197#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3198pub struct ExplainOption {
3199    /// Include detailed debug info
3200    pub verbose: bool,
3201    /// Actually execute the plan and report metrics
3202    pub analyze: bool,
3203    /// Output syntax/format
3204    pub format: ExplainFormat,
3205}
3206
3207impl Default for ExplainOption {
3208    fn default() -> Self {
3209        ExplainOption {
3210            verbose: false,
3211            analyze: false,
3212            format: ExplainFormat::Indent,
3213        }
3214    }
3215}
3216
3217impl ExplainOption {
3218    /// Builder‐style setter for `verbose`
3219    pub fn with_verbose(mut self, verbose: bool) -> Self {
3220        self.verbose = verbose;
3221        self
3222    }
3223
3224    /// Builder‐style setter for `analyze`
3225    pub fn with_analyze(mut self, analyze: bool) -> Self {
3226        self.analyze = analyze;
3227        self
3228    }
3229
3230    /// Builder‐style setter for `format`
3231    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3232        self.format = format;
3233        self
3234    }
3235}
3236
3237/// Produces a relation with string representations of
3238/// various parts of the plan
3239///
3240/// See [the documentation] for more information
3241///
3242/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3243#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3244pub struct Explain {
3245    /// Should extra (detailed, intermediate plans) be included?
3246    pub verbose: bool,
3247    /// Output format for explain, if specified.
3248    /// If none, defaults to `text`
3249    pub explain_format: ExplainFormat,
3250    /// The logical plan that is being EXPLAIN'd
3251    pub plan: Arc<LogicalPlan>,
3252    /// Represent the various stages plans have gone through
3253    pub stringified_plans: Vec<StringifiedPlan>,
3254    /// The output schema of the explain (2 columns of text)
3255    pub schema: DFSchemaRef,
3256    /// Used by physical planner to check if should proceed with planning
3257    pub logical_optimization_succeeded: bool,
3258}
3259
3260// Manual implementation needed because of `schema` field. Comparison excludes this field.
3261impl PartialOrd for Explain {
3262    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3263        #[derive(PartialEq, PartialOrd)]
3264        struct ComparableExplain<'a> {
3265            /// Should extra (detailed, intermediate plans) be included?
3266            pub verbose: &'a bool,
3267            /// The logical plan that is being EXPLAIN'd
3268            pub plan: &'a Arc<LogicalPlan>,
3269            /// Represent the various stages plans have gone through
3270            pub stringified_plans: &'a Vec<StringifiedPlan>,
3271            /// Used by physical planner to check if should proceed with planning
3272            pub logical_optimization_succeeded: &'a bool,
3273        }
3274        let comparable_self = ComparableExplain {
3275            verbose: &self.verbose,
3276            plan: &self.plan,
3277            stringified_plans: &self.stringified_plans,
3278            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3279        };
3280        let comparable_other = ComparableExplain {
3281            verbose: &other.verbose,
3282            plan: &other.plan,
3283            stringified_plans: &other.stringified_plans,
3284            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3285        };
3286        comparable_self
3287            .partial_cmp(&comparable_other)
3288            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3289            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3290    }
3291}
3292
3293/// Runs the actual plan, and then prints the physical plan with
3294/// with execution metrics.
3295#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3296pub struct Analyze {
3297    /// Should extra detail be included?
3298    pub verbose: bool,
3299    /// The logical plan that is being EXPLAIN ANALYZE'd
3300    pub input: Arc<LogicalPlan>,
3301    /// The output schema of the explain (2 columns of text)
3302    pub schema: DFSchemaRef,
3303}
3304
3305// Manual implementation needed because of `schema` field. Comparison excludes this field.
3306impl PartialOrd for Analyze {
3307    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3308        match self.verbose.partial_cmp(&other.verbose) {
3309            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3310            cmp => cmp,
3311        }
3312        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3313        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3314    }
3315}
3316
3317/// Extension operator defined outside of DataFusion
3318// TODO(clippy): This clippy `allow` should be removed if
3319// the manual `PartialEq` is removed in favor of a derive.
3320// (see `PartialEq` the impl for details.)
3321#[allow(clippy::allow_attributes)]
3322#[allow(clippy::derived_hash_with_manual_eq)]
3323#[derive(Debug, Clone, Eq, Hash)]
3324pub struct Extension {
3325    /// The runtime extension operator
3326    pub node: Arc<dyn UserDefinedLogicalNode>,
3327}
3328
3329// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3330// This manual implementation should be removed if
3331// https://github.com/rust-lang/rust/issues/39128 is fixed.
3332impl PartialEq for Extension {
3333    fn eq(&self, other: &Self) -> bool {
3334        self.node.eq(&other.node)
3335    }
3336}
3337
3338impl PartialOrd for Extension {
3339    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3340        self.node.partial_cmp(&other.node)
3341    }
3342}
3343
3344/// Produces the first `n` tuples from its input and discards the rest.
3345#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3346pub struct Limit {
3347    /// Number of rows to skip before fetch
3348    pub skip: Option<Box<Expr>>,
3349    /// Maximum number of rows to fetch,
3350    /// None means fetching all rows
3351    pub fetch: Option<Box<Expr>>,
3352    /// The logical plan
3353    pub input: Arc<LogicalPlan>,
3354}
3355
3356/// Different types of skip expression in Limit plan.
3357pub enum SkipType {
3358    /// The skip expression is a literal value.
3359    Literal(usize),
3360    /// Currently only supports expressions that can be folded into constants.
3361    UnsupportedExpr,
3362}
3363
3364/// Different types of fetch expression in Limit plan.
3365pub enum FetchType {
3366    /// The fetch expression is a literal value.
3367    /// `Literal(None)` means the fetch expression is not provided.
3368    Literal(Option<usize>),
3369    /// Currently only supports expressions that can be folded into constants.
3370    UnsupportedExpr,
3371}
3372
3373impl Limit {
3374    /// Get the skip type from the limit plan.
3375    pub fn get_skip_type(&self) -> Result<SkipType> {
3376        match self.skip.as_deref() {
3377            Some(expr) => match *expr {
3378                Expr::Literal(ScalarValue::Int64(s), _) => {
3379                    // `skip = NULL` is equivalent to `skip = 0`
3380                    let s = s.unwrap_or(0);
3381                    if s >= 0 {
3382                        Ok(SkipType::Literal(s as usize))
3383                    } else {
3384                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3385                    }
3386                }
3387                _ => Ok(SkipType::UnsupportedExpr),
3388            },
3389            // `skip = None` is equivalent to `skip = 0`
3390            None => Ok(SkipType::Literal(0)),
3391        }
3392    }
3393
3394    /// Get the fetch type from the limit plan.
3395    pub fn get_fetch_type(&self) -> Result<FetchType> {
3396        match self.fetch.as_deref() {
3397            Some(expr) => match *expr {
3398                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3399                    if s >= 0 {
3400                        Ok(FetchType::Literal(Some(s as usize)))
3401                    } else {
3402                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3403                    }
3404                }
3405                Expr::Literal(ScalarValue::Int64(None), _) => {
3406                    Ok(FetchType::Literal(None))
3407                }
3408                _ => Ok(FetchType::UnsupportedExpr),
3409            },
3410            None => Ok(FetchType::Literal(None)),
3411        }
3412    }
3413}
3414
3415/// Removes duplicate rows from the input
3416#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3417pub enum Distinct {
3418    /// Plain `DISTINCT` referencing all selection expressions
3419    All(Arc<LogicalPlan>),
3420    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3421    On(DistinctOn),
3422}
3423
3424impl Distinct {
3425    /// return a reference to the nodes input
3426    pub fn input(&self) -> &Arc<LogicalPlan> {
3427        match self {
3428            Distinct::All(input) => input,
3429            Distinct::On(DistinctOn { input, .. }) => input,
3430        }
3431    }
3432}
3433
3434/// Removes duplicate rows from the input
3435#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3436pub struct DistinctOn {
3437    /// The `DISTINCT ON` clause expression list
3438    pub on_expr: Vec<Expr>,
3439    /// The selected projection expression list
3440    pub select_expr: Vec<Expr>,
3441    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3442    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3443    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3444    pub sort_expr: Option<Vec<SortExpr>>,
3445    /// The logical plan that is being DISTINCT'd
3446    pub input: Arc<LogicalPlan>,
3447    /// The schema description of the DISTINCT ON output
3448    pub schema: DFSchemaRef,
3449}
3450
3451impl DistinctOn {
3452    /// Create a new `DistinctOn` struct.
3453    pub fn try_new(
3454        on_expr: Vec<Expr>,
3455        select_expr: Vec<Expr>,
3456        sort_expr: Option<Vec<SortExpr>>,
3457        input: Arc<LogicalPlan>,
3458    ) -> Result<Self> {
3459        if on_expr.is_empty() {
3460            return plan_err!("No `ON` expressions provided");
3461        }
3462
3463        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3464        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3465            .into_iter()
3466            .collect();
3467
3468        let dfschema = DFSchema::new_with_metadata(
3469            qualified_fields,
3470            input.schema().metadata().clone(),
3471        )?;
3472
3473        let mut distinct_on = DistinctOn {
3474            on_expr,
3475            select_expr,
3476            sort_expr: None,
3477            input,
3478            schema: Arc::new(dfschema),
3479        };
3480
3481        if let Some(sort_expr) = sort_expr {
3482            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3483        }
3484
3485        Ok(distinct_on)
3486    }
3487
3488    /// Try to update `self` with a new sort expressions.
3489    ///
3490    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3491    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3492        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3493
3494        // Check that the left-most sort expressions are the same as the `ON` expressions.
3495        let mut matched = true;
3496        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3497            if on != &sort.expr {
3498                matched = false;
3499                break;
3500            }
3501        }
3502
3503        if self.on_expr.len() > sort_expr.len() || !matched {
3504            return plan_err!(
3505                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3506            );
3507        }
3508
3509        self.sort_expr = Some(sort_expr);
3510        Ok(self)
3511    }
3512}
3513
3514// Manual implementation needed because of `schema` field. Comparison excludes this field.
3515impl PartialOrd for DistinctOn {
3516    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3517        #[derive(PartialEq, PartialOrd)]
3518        struct ComparableDistinctOn<'a> {
3519            /// The `DISTINCT ON` clause expression list
3520            pub on_expr: &'a Vec<Expr>,
3521            /// The selected projection expression list
3522            pub select_expr: &'a Vec<Expr>,
3523            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3524            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3525            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3526            pub sort_expr: &'a Option<Vec<SortExpr>>,
3527            /// The logical plan that is being DISTINCT'd
3528            pub input: &'a Arc<LogicalPlan>,
3529        }
3530        let comparable_self = ComparableDistinctOn {
3531            on_expr: &self.on_expr,
3532            select_expr: &self.select_expr,
3533            sort_expr: &self.sort_expr,
3534            input: &self.input,
3535        };
3536        let comparable_other = ComparableDistinctOn {
3537            on_expr: &other.on_expr,
3538            select_expr: &other.select_expr,
3539            sort_expr: &other.sort_expr,
3540            input: &other.input,
3541        };
3542        comparable_self
3543            .partial_cmp(&comparable_other)
3544            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3545            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3546    }
3547}
3548
3549/// Aggregates its input based on a set of grouping and aggregate
3550/// expressions (e.g. SUM).
3551///
3552/// # Output Schema
3553///
3554/// The output schema is the group expressions followed by the aggregate
3555/// expressions in order.
3556///
3557/// For example, given the input schema `"A", "B", "C"` and the aggregate
3558/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3559/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3560/// single new column
3561#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3562// mark non_exhaustive to encourage use of try_new/new()
3563#[non_exhaustive]
3564pub struct Aggregate {
3565    /// The incoming logical plan
3566    pub input: Arc<LogicalPlan>,
3567    /// Grouping expressions
3568    pub group_expr: Vec<Expr>,
3569    /// Aggregate expressions
3570    pub aggr_expr: Vec<Expr>,
3571    /// The schema description of the aggregate output
3572    pub schema: DFSchemaRef,
3573}
3574
3575impl Aggregate {
3576    /// Create a new aggregate operator.
3577    pub fn try_new(
3578        input: Arc<LogicalPlan>,
3579        group_expr: Vec<Expr>,
3580        aggr_expr: Vec<Expr>,
3581    ) -> Result<Self> {
3582        let group_expr = enumerate_grouping_sets(group_expr)?;
3583
3584        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3585
3586        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3587
3588        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3589
3590        // Even columns that cannot be null will become nullable when used in a grouping set.
3591        if is_grouping_set {
3592            qualified_fields = qualified_fields
3593                .into_iter()
3594                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3595                .collect::<Vec<_>>();
3596            qualified_fields.push((
3597                None,
3598                Field::new(
3599                    Self::INTERNAL_GROUPING_ID,
3600                    Self::grouping_id_type(qualified_fields.len()),
3601                    false,
3602                )
3603                .into(),
3604            ));
3605        }
3606
3607        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3608
3609        let schema = DFSchema::new_with_metadata(
3610            qualified_fields,
3611            input.schema().metadata().clone(),
3612        )?;
3613
3614        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3615    }
3616
3617    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3618    /// building the schema again when the schema is already known.
3619    ///
3620    /// This method should only be called when you are absolutely sure that the schema being
3621    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3622    #[expect(clippy::needless_pass_by_value)]
3623    pub fn try_new_with_schema(
3624        input: Arc<LogicalPlan>,
3625        group_expr: Vec<Expr>,
3626        aggr_expr: Vec<Expr>,
3627        schema: DFSchemaRef,
3628    ) -> Result<Self> {
3629        if group_expr.is_empty() && aggr_expr.is_empty() {
3630            return plan_err!(
3631                "Aggregate requires at least one grouping or aggregate expression. \
3632                Aggregate without grouping expressions nor aggregate expressions is \
3633                logically equivalent to, but less efficient than, VALUES producing \
3634                single row. Please use VALUES instead."
3635            );
3636        }
3637        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3638        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3639            return plan_err!(
3640                "Aggregate schema has wrong number of fields. Expected {} got {}",
3641                group_expr_count + aggr_expr.len(),
3642                schema.fields().len()
3643            );
3644        }
3645
3646        let aggregate_func_dependencies =
3647            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3648        let new_schema = schema.as_ref().clone();
3649        let schema = Arc::new(
3650            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3651        );
3652        Ok(Self {
3653            input,
3654            group_expr,
3655            aggr_expr,
3656            schema,
3657        })
3658    }
3659
3660    fn is_grouping_set(&self) -> bool {
3661        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3662    }
3663
3664    /// Get the output expressions.
3665    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3666        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3667            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3668        });
3669        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3670        if self.is_grouping_set() {
3671            exprs.push(&INTERNAL_ID_EXPR);
3672        }
3673        exprs.extend(self.aggr_expr.iter());
3674        debug_assert!(exprs.len() == self.schema.fields().len());
3675        Ok(exprs)
3676    }
3677
3678    /// Get the length of the group by expression in the output schema
3679    /// This is not simply group by expression length. Expression may be
3680    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3681    pub fn group_expr_len(&self) -> Result<usize> {
3682        grouping_set_expr_count(&self.group_expr)
3683    }
3684
3685    /// Returns the data type of the grouping id.
3686    /// The grouping ID value is a bitmask where each set bit
3687    /// indicates that the corresponding grouping expression is
3688    /// null
3689    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3690        if group_exprs <= 8 {
3691            DataType::UInt8
3692        } else if group_exprs <= 16 {
3693            DataType::UInt16
3694        } else if group_exprs <= 32 {
3695            DataType::UInt32
3696        } else {
3697            DataType::UInt64
3698        }
3699    }
3700
3701    /// Internal column used when the aggregation is a grouping set.
3702    ///
3703    /// This column contains a bitmask where each bit represents a grouping
3704    /// expression. The least significant bit corresponds to the rightmost
3705    /// grouping expression. A bit value of 0 indicates that the corresponding
3706    /// column is included in the grouping set, while a value of 1 means it is excluded.
3707    ///
3708    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3709    /// column will have the following values:
3710    ///     0b00: Both `a` and `b` are included
3711    ///     0b01: `b` is excluded
3712    ///     0b10: `a` is excluded
3713    ///     0b11: Both `a` and `b` are excluded
3714    ///
3715    /// This internal column is necessary because excluded columns are replaced
3716    /// with `NULL` values. To handle these cases correctly, we must distinguish
3717    /// between an actual `NULL` value in a column and a column being excluded from the set.
3718    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3719}
3720
3721// Manual implementation needed because of `schema` field. Comparison excludes this field.
3722impl PartialOrd for Aggregate {
3723    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3724        match self.input.partial_cmp(&other.input) {
3725            Some(Ordering::Equal) => {
3726                match self.group_expr.partial_cmp(&other.group_expr) {
3727                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3728                    cmp => cmp,
3729                }
3730            }
3731            cmp => cmp,
3732        }
3733        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3734        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3735    }
3736}
3737
3738/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3739fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3740    group_expr
3741        .iter()
3742        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3743}
3744
3745/// Calculates functional dependencies for aggregate expressions.
3746fn calc_func_dependencies_for_aggregate(
3747    // Expressions in the GROUP BY clause:
3748    group_expr: &[Expr],
3749    // Input plan of the aggregate:
3750    input: &LogicalPlan,
3751    // Aggregate schema
3752    aggr_schema: &DFSchema,
3753) -> Result<FunctionalDependencies> {
3754    // We can do a case analysis on how to propagate functional dependencies based on
3755    // whether the GROUP BY in question contains a grouping set expression:
3756    // - If so, the functional dependencies will be empty because we cannot guarantee
3757    //   that GROUP BY expression results will be unique.
3758    // - Otherwise, it may be possible to propagate functional dependencies.
3759    if !contains_grouping_set(group_expr) {
3760        let group_by_expr_names = group_expr
3761            .iter()
3762            .map(|item| item.schema_name().to_string())
3763            .collect::<IndexSet<_>>()
3764            .into_iter()
3765            .collect::<Vec<_>>();
3766        let aggregate_func_dependencies = aggregate_functional_dependencies(
3767            input.schema(),
3768            &group_by_expr_names,
3769            aggr_schema,
3770        );
3771        Ok(aggregate_func_dependencies)
3772    } else {
3773        Ok(FunctionalDependencies::empty())
3774    }
3775}
3776
3777/// This function projects functional dependencies of the `input` plan according
3778/// to projection expressions `exprs`.
3779fn calc_func_dependencies_for_project(
3780    exprs: &[Expr],
3781    input: &LogicalPlan,
3782) -> Result<FunctionalDependencies> {
3783    let input_fields = input.schema().field_names();
3784    // Calculate expression indices (if present) in the input schema.
3785    let proj_indices = exprs
3786        .iter()
3787        .map(|expr| match expr {
3788            #[expect(deprecated)]
3789            Expr::Wildcard { qualifier, options } => {
3790                let wildcard_fields = exprlist_to_fields(
3791                    vec![&Expr::Wildcard {
3792                        qualifier: qualifier.clone(),
3793                        options: options.clone(),
3794                    }],
3795                    input,
3796                )?;
3797                Ok::<_, DataFusionError>(
3798                    wildcard_fields
3799                        .into_iter()
3800                        .filter_map(|(qualifier, f)| {
3801                            let flat_name = qualifier
3802                                .map(|t| format!("{}.{}", t, f.name()))
3803                                .unwrap_or_else(|| f.name().clone());
3804                            input_fields.iter().position(|item| *item == flat_name)
3805                        })
3806                        .collect::<Vec<_>>(),
3807                )
3808            }
3809            Expr::Alias(alias) => {
3810                let name = format!("{}", alias.expr);
3811                Ok(input_fields
3812                    .iter()
3813                    .position(|item| *item == name)
3814                    .map(|i| vec![i])
3815                    .unwrap_or(vec![]))
3816            }
3817            _ => {
3818                let name = format!("{expr}");
3819                Ok(input_fields
3820                    .iter()
3821                    .position(|item| *item == name)
3822                    .map(|i| vec![i])
3823                    .unwrap_or(vec![]))
3824            }
3825        })
3826        .collect::<Result<Vec<_>>>()?
3827        .into_iter()
3828        .flatten()
3829        .collect::<Vec<_>>();
3830
3831    Ok(input
3832        .schema()
3833        .functional_dependencies()
3834        .project_functional_dependencies(&proj_indices, exprs.len()))
3835}
3836
3837/// Sorts its input according to a list of sort expressions.
3838#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3839pub struct Sort {
3840    /// The sort expressions
3841    pub expr: Vec<SortExpr>,
3842    /// The incoming logical plan
3843    pub input: Arc<LogicalPlan>,
3844    /// Optional fetch limit
3845    pub fetch: Option<usize>,
3846}
3847
3848/// Join two logical plans on one or more join columns
3849#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3850pub struct Join {
3851    /// Left input
3852    pub left: Arc<LogicalPlan>,
3853    /// Right input
3854    pub right: Arc<LogicalPlan>,
3855    /// Equijoin clause expressed as pairs of (left, right) join expressions
3856    pub on: Vec<(Expr, Expr)>,
3857    /// Filters applied during join (non-equi conditions)
3858    pub filter: Option<Expr>,
3859    /// Join type
3860    pub join_type: JoinType,
3861    /// Join constraint
3862    pub join_constraint: JoinConstraint,
3863    /// The output schema, containing fields from the left and right inputs
3864    pub schema: DFSchemaRef,
3865    /// Defines the null equality for the join.
3866    pub null_equality: NullEquality,
3867    /// Whether this is a null-aware anti join (for NOT IN semantics).
3868    ///
3869    /// Only applies to LeftAnti joins. When true, implements SQL NOT IN semantics where:
3870    /// - If the right side (subquery) contains any NULL in join keys, no rows are output
3871    /// - Left side rows with NULL in join keys are not output
3872    ///
3873    /// This is required for correct NOT IN subquery behavior with three-valued logic.
3874    pub null_aware: bool,
3875}
3876
3877impl Join {
3878    /// Creates a new Join operator with automatically computed schema.
3879    ///
3880    /// This constructor computes the schema based on the join type and inputs,
3881    /// removing the need to manually specify the schema or call `recompute_schema`.
3882    ///
3883    /// # Arguments
3884    ///
3885    /// * `left` - Left input plan
3886    /// * `right` - Right input plan
3887    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3888    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3889    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3890    /// * `join_constraint` - Join constraint (On, Using)
3891    /// * `null_equality` - How to handle nulls in join comparisons
3892    /// * `null_aware` - Whether this is a null-aware anti join (for NOT IN semantics)
3893    ///
3894    /// # Returns
3895    ///
3896    /// A new Join operator with the computed schema
3897    #[expect(clippy::too_many_arguments)]
3898    pub fn try_new(
3899        left: Arc<LogicalPlan>,
3900        right: Arc<LogicalPlan>,
3901        on: Vec<(Expr, Expr)>,
3902        filter: Option<Expr>,
3903        join_type: JoinType,
3904        join_constraint: JoinConstraint,
3905        null_equality: NullEquality,
3906        null_aware: bool,
3907    ) -> Result<Self> {
3908        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3909
3910        Ok(Join {
3911            left,
3912            right,
3913            on,
3914            filter,
3915            join_type,
3916            join_constraint,
3917            schema: Arc::new(join_schema),
3918            null_equality,
3919            null_aware,
3920        })
3921    }
3922
3923    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3924    /// create the physical join.
3925    pub fn try_new_with_project_input(
3926        original: &LogicalPlan,
3927        left: Arc<LogicalPlan>,
3928        right: Arc<LogicalPlan>,
3929        column_on: (Vec<Column>, Vec<Column>),
3930    ) -> Result<(Self, bool)> {
3931        let original_join = match original {
3932            LogicalPlan::Join(join) => join,
3933            _ => return plan_err!("Could not create join with project input"),
3934        };
3935
3936        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3937        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3938
3939        let mut requalified = false;
3940
3941        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3942        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3943        if original_join.join_type == JoinType::Inner
3944            || original_join.join_type == JoinType::Left
3945            || original_join.join_type == JoinType::Right
3946            || original_join.join_type == JoinType::Full
3947        {
3948            (left_sch, right_sch, requalified) =
3949                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3950        }
3951
3952        let on: Vec<(Expr, Expr)> = column_on
3953            .0
3954            .into_iter()
3955            .zip(column_on.1)
3956            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3957            .collect();
3958
3959        let join_schema = build_join_schema(
3960            left_sch.schema(),
3961            right_sch.schema(),
3962            &original_join.join_type,
3963        )?;
3964
3965        Ok((
3966            Join {
3967                left,
3968                right,
3969                on,
3970                filter: original_join.filter.clone(),
3971                join_type: original_join.join_type,
3972                join_constraint: original_join.join_constraint,
3973                schema: Arc::new(join_schema),
3974                null_equality: original_join.null_equality,
3975                null_aware: original_join.null_aware,
3976            },
3977            requalified,
3978        ))
3979    }
3980}
3981
3982// Manual implementation needed because of `schema` field. Comparison excludes this field.
3983impl PartialOrd for Join {
3984    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3985        #[derive(PartialEq, PartialOrd)]
3986        struct ComparableJoin<'a> {
3987            /// Left input
3988            pub left: &'a Arc<LogicalPlan>,
3989            /// Right input
3990            pub right: &'a Arc<LogicalPlan>,
3991            /// Equijoin clause expressed as pairs of (left, right) join expressions
3992            pub on: &'a Vec<(Expr, Expr)>,
3993            /// Filters applied during join (non-equi conditions)
3994            pub filter: &'a Option<Expr>,
3995            /// Join type
3996            pub join_type: &'a JoinType,
3997            /// Join constraint
3998            pub join_constraint: &'a JoinConstraint,
3999            /// The null handling behavior for equalities
4000            pub null_equality: &'a NullEquality,
4001        }
4002        let comparable_self = ComparableJoin {
4003            left: &self.left,
4004            right: &self.right,
4005            on: &self.on,
4006            filter: &self.filter,
4007            join_type: &self.join_type,
4008            join_constraint: &self.join_constraint,
4009            null_equality: &self.null_equality,
4010        };
4011        let comparable_other = ComparableJoin {
4012            left: &other.left,
4013            right: &other.right,
4014            on: &other.on,
4015            filter: &other.filter,
4016            join_type: &other.join_type,
4017            join_constraint: &other.join_constraint,
4018            null_equality: &other.null_equality,
4019        };
4020        comparable_self
4021            .partial_cmp(&comparable_other)
4022            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4023            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4024    }
4025}
4026
4027/// Subquery
4028#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4029pub struct Subquery {
4030    /// The subquery
4031    pub subquery: Arc<LogicalPlan>,
4032    /// The outer references used in the subquery
4033    pub outer_ref_columns: Vec<Expr>,
4034    /// Span information for subquery projection columns
4035    pub spans: Spans,
4036}
4037
4038impl Normalizeable for Subquery {
4039    fn can_normalize(&self) -> bool {
4040        false
4041    }
4042}
4043
4044impl NormalizeEq for Subquery {
4045    fn normalize_eq(&self, other: &Self) -> bool {
4046        // TODO: may be implement NormalizeEq for LogicalPlan?
4047        *self.subquery == *other.subquery
4048            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4049            && self
4050                .outer_ref_columns
4051                .iter()
4052                .zip(other.outer_ref_columns.iter())
4053                .all(|(a, b)| a.normalize_eq(b))
4054    }
4055}
4056
4057impl Subquery {
4058    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4059        match plan {
4060            Expr::ScalarSubquery(it) => Ok(it),
4061            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4062            _ => plan_err!("Could not coerce into ScalarSubquery!"),
4063        }
4064    }
4065
4066    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4067        Subquery {
4068            subquery: plan,
4069            outer_ref_columns: self.outer_ref_columns.clone(),
4070            spans: Spans::new(),
4071        }
4072    }
4073}
4074
4075impl Debug for Subquery {
4076    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4077        write!(f, "<subquery>")
4078    }
4079}
4080
4081/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
4082///
4083/// See [`Partitioning`] for more details on partitioning
4084///
4085/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
4086#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4087pub enum Partitioning {
4088    /// Allocate batches using a round-robin algorithm and the specified number of partitions
4089    RoundRobinBatch(usize),
4090    /// Allocate rows based on a hash of one of more expressions and the specified number
4091    /// of partitions.
4092    Hash(Vec<Expr>, usize),
4093    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
4094    DistributeBy(Vec<Expr>),
4095}
4096
4097/// Represent the unnesting operation on a list column, such as the recursion depth and
4098/// the output column name after unnesting
4099///
4100/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4101///
4102/// ```text
4103///   input             output_name
4104///  ┌─────────┐      ┌─────────┐
4105///  │{{1,2}}  │      │ 1       │
4106///  ├─────────┼─────►├─────────┤
4107///  │{{3}}    │      │ 2       │
4108///  ├─────────┤      ├─────────┤
4109///  │{{4},{5}}│      │ 3       │
4110///  └─────────┘      ├─────────┤
4111///                   │ 4       │
4112///                   ├─────────┤
4113///                   │ 5       │
4114///                   └─────────┘
4115/// ```
4116#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4117pub struct ColumnUnnestList {
4118    pub output_column: Column,
4119    pub depth: usize,
4120}
4121
4122impl Display for ColumnUnnestList {
4123    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4124        write!(f, "{}|depth={}", self.output_column, self.depth)
4125    }
4126}
4127
4128/// Unnest a column that contains a nested list type. See
4129/// [`UnnestOptions`] for more details.
4130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4131pub struct Unnest {
4132    /// The incoming logical plan
4133    pub input: Arc<LogicalPlan>,
4134    /// Columns to run unnest on, can be a list of (List/Struct) columns
4135    pub exec_columns: Vec<Column>,
4136    /// refer to the indices(in the input schema) of columns
4137    /// that have type list to run unnest on
4138    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4139    /// refer to the indices (in the input schema) of columns
4140    /// that have type struct to run unnest on
4141    pub struct_type_columns: Vec<usize>,
4142    /// Having items aligned with the output columns
4143    /// representing which column in the input schema each output column depends on
4144    pub dependency_indices: Vec<usize>,
4145    /// The output schema, containing the unnested field column.
4146    pub schema: DFSchemaRef,
4147    /// Options
4148    pub options: UnnestOptions,
4149}
4150
4151// Manual implementation needed because of `schema` field. Comparison excludes this field.
4152impl PartialOrd for Unnest {
4153    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4154        #[derive(PartialEq, PartialOrd)]
4155        struct ComparableUnnest<'a> {
4156            /// The incoming logical plan
4157            pub input: &'a Arc<LogicalPlan>,
4158            /// Columns to run unnest on, can be a list of (List/Struct) columns
4159            pub exec_columns: &'a Vec<Column>,
4160            /// refer to the indices(in the input schema) of columns
4161            /// that have type list to run unnest on
4162            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4163            /// refer to the indices (in the input schema) of columns
4164            /// that have type struct to run unnest on
4165            pub struct_type_columns: &'a Vec<usize>,
4166            /// Having items aligned with the output columns
4167            /// representing which column in the input schema each output column depends on
4168            pub dependency_indices: &'a Vec<usize>,
4169            /// Options
4170            pub options: &'a UnnestOptions,
4171        }
4172        let comparable_self = ComparableUnnest {
4173            input: &self.input,
4174            exec_columns: &self.exec_columns,
4175            list_type_columns: &self.list_type_columns,
4176            struct_type_columns: &self.struct_type_columns,
4177            dependency_indices: &self.dependency_indices,
4178            options: &self.options,
4179        };
4180        let comparable_other = ComparableUnnest {
4181            input: &other.input,
4182            exec_columns: &other.exec_columns,
4183            list_type_columns: &other.list_type_columns,
4184            struct_type_columns: &other.struct_type_columns,
4185            dependency_indices: &other.dependency_indices,
4186            options: &other.options,
4187        };
4188        comparable_self
4189            .partial_cmp(&comparable_other)
4190            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4191            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4192    }
4193}
4194
4195impl Unnest {
4196    pub fn try_new(
4197        input: Arc<LogicalPlan>,
4198        exec_columns: Vec<Column>,
4199        options: UnnestOptions,
4200    ) -> Result<Self> {
4201        if exec_columns.is_empty() {
4202            return plan_err!("unnest plan requires at least 1 column to unnest");
4203        }
4204
4205        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4206        let mut struct_columns = vec![];
4207        let indices_to_unnest = exec_columns
4208            .iter()
4209            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4210            .collect::<Result<HashMap<usize, &Column>>>()?;
4211
4212        let input_schema = input.schema();
4213
4214        let mut dependency_indices = vec![];
4215        // Transform input schema into new schema
4216        // Given this comprehensive example
4217        //
4218        // input schema:
4219        // 1.col1_unnest_placeholder: list[list[int]],
4220        // 2.col1: list[list[int]]
4221        // 3.col2: list[int]
4222        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4223        // output schema:
4224        // 1.unnest_col1_depth_2: int
4225        // 2.unnest_col1_depth_1: list[int]
4226        // 3.col1: list[list[int]]
4227        // 4.unnest_col2_depth_1: int
4228        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4229        // the plural.
4230        let fields = input_schema
4231            .iter()
4232            .enumerate()
4233            .map(|(index, (original_qualifier, original_field))| {
4234                match indices_to_unnest.get(&index) {
4235                    Some(column_to_unnest) => {
4236                        let recursions_on_column = options
4237                            .recursions
4238                            .iter()
4239                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4240                            .collect::<Vec<_>>();
4241                        let mut transformed_columns = recursions_on_column
4242                            .iter()
4243                            .map(|r| {
4244                                list_columns.push((
4245                                    index,
4246                                    ColumnUnnestList {
4247                                        output_column: r.output_column.clone(),
4248                                        depth: r.depth,
4249                                    },
4250                                ));
4251                                Ok(get_unnested_columns(
4252                                    &r.output_column.name,
4253                                    original_field.data_type(),
4254                                    r.depth,
4255                                )?
4256                                .into_iter()
4257                                .next()
4258                                .unwrap()) // because unnesting a list column always result into one result
4259                            })
4260                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4261                        if transformed_columns.is_empty() {
4262                            transformed_columns = get_unnested_columns(
4263                                &column_to_unnest.name,
4264                                original_field.data_type(),
4265                                1,
4266                            )?;
4267                            match original_field.data_type() {
4268                                DataType::Struct(_) => {
4269                                    struct_columns.push(index);
4270                                }
4271                                DataType::List(_)
4272                                | DataType::FixedSizeList(_, _)
4273                                | DataType::LargeList(_) => {
4274                                    list_columns.push((
4275                                        index,
4276                                        ColumnUnnestList {
4277                                            output_column: Column::from_name(
4278                                                &column_to_unnest.name,
4279                                            ),
4280                                            depth: 1,
4281                                        },
4282                                    ));
4283                                }
4284                                _ => {}
4285                            };
4286                        }
4287
4288                        // new columns dependent on the same original index
4289                        dependency_indices.extend(std::iter::repeat_n(
4290                            index,
4291                            transformed_columns.len(),
4292                        ));
4293                        Ok(transformed_columns
4294                            .iter()
4295                            .map(|(col, field)| {
4296                                (col.relation.to_owned(), field.to_owned())
4297                            })
4298                            .collect())
4299                    }
4300                    None => {
4301                        dependency_indices.push(index);
4302                        Ok(vec![(
4303                            original_qualifier.cloned(),
4304                            Arc::clone(original_field),
4305                        )])
4306                    }
4307                }
4308            })
4309            .collect::<Result<Vec<_>>>()?
4310            .into_iter()
4311            .flatten()
4312            .collect::<Vec<_>>();
4313
4314        let metadata = input_schema.metadata().clone();
4315        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4316        // We can use the existing functional dependencies:
4317        let deps = input_schema.functional_dependencies().clone();
4318        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4319
4320        Ok(Unnest {
4321            input,
4322            exec_columns,
4323            list_type_columns: list_columns,
4324            struct_type_columns: struct_columns,
4325            dependency_indices,
4326            schema,
4327            options,
4328        })
4329    }
4330}
4331
4332// Based on data type, either struct or a variant of list
4333// return a set of columns as the result of unnesting
4334// the input columns.
4335// For example, given a column with name "a",
4336// - List(Element) returns ["a"] with data type Element
4337// - Struct(field1, field2) returns ["a.field1","a.field2"]
4338// For list data type, an argument depth is used to specify
4339// the recursion level
4340fn get_unnested_columns(
4341    col_name: &String,
4342    data_type: &DataType,
4343    depth: usize,
4344) -> Result<Vec<(Column, Arc<Field>)>> {
4345    let mut qualified_columns = Vec::with_capacity(1);
4346
4347    match data_type {
4348        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4349            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4350            let new_field = Arc::new(Field::new(
4351                col_name, data_type,
4352                // Unnesting may produce NULLs even if the list is not null.
4353                // For example: unnest([1], []) -> 1, null
4354                true,
4355            ));
4356            let column = Column::from_name(col_name);
4357            // let column = Column::from((None, &new_field));
4358            qualified_columns.push((column, new_field));
4359        }
4360        DataType::Struct(fields) => {
4361            qualified_columns.extend(fields.iter().map(|f| {
4362                let new_name = format!("{}.{}", col_name, f.name());
4363                let column = Column::from_name(&new_name);
4364                let new_field = f.as_ref().clone().with_name(new_name);
4365                // let column = Column::from((None, &f));
4366                (column, Arc::new(new_field))
4367            }))
4368        }
4369        _ => {
4370            return internal_err!("trying to unnest on invalid data type {data_type}");
4371        }
4372    };
4373    Ok(qualified_columns)
4374}
4375
4376// Get the data type of a multi-dimensional type after unnesting it
4377// with a given depth
4378fn get_unnested_list_datatype_recursive(
4379    data_type: &DataType,
4380    depth: usize,
4381) -> Result<DataType> {
4382    match data_type {
4383        DataType::List(field)
4384        | DataType::FixedSizeList(field, _)
4385        | DataType::LargeList(field) => {
4386            if depth == 1 {
4387                return Ok(field.data_type().clone());
4388            }
4389            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4390        }
4391        _ => {}
4392    };
4393
4394    internal_err!("trying to unnest on invalid data type {data_type}")
4395}
4396
4397#[cfg(test)]
4398mod tests {
4399    use super::*;
4400    use crate::builder::LogicalTableSource;
4401    use crate::logical_plan::table_scan;
4402    use crate::select_expr::SelectExpr;
4403    use crate::test::function_stub::{count, count_udaf};
4404    use crate::{
4405        GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4406        scalar_subquery,
4407    };
4408    use datafusion_common::metadata::ScalarAndMetadata;
4409    use datafusion_common::tree_node::{
4410        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4411    };
4412    use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4413    use insta::{assert_debug_snapshot, assert_snapshot};
4414    use std::hash::DefaultHasher;
4415
4416    fn employee_schema() -> Schema {
4417        Schema::new(vec![
4418            Field::new("id", DataType::Int32, false),
4419            Field::new("first_name", DataType::Utf8, false),
4420            Field::new("last_name", DataType::Utf8, false),
4421            Field::new("state", DataType::Utf8, false),
4422            Field::new("salary", DataType::Int32, false),
4423        ])
4424    }
4425
4426    fn display_plan() -> Result<LogicalPlan> {
4427        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4428            .build()?;
4429
4430        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4431            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4432            .project(vec![col("id")])?
4433            .build()
4434    }
4435
4436    #[test]
4437    fn test_display_indent() -> Result<()> {
4438        let plan = display_plan()?;
4439
4440        assert_snapshot!(plan.display_indent(), @r"
4441        Projection: employee_csv.id
4442          Filter: employee_csv.state IN (<subquery>)
4443            Subquery:
4444              TableScan: employee_csv projection=[state]
4445            TableScan: employee_csv projection=[id, state]
4446        ");
4447        Ok(())
4448    }
4449
4450    #[test]
4451    fn test_display_indent_schema() -> Result<()> {
4452        let plan = display_plan()?;
4453
4454        assert_snapshot!(plan.display_indent_schema(), @r"
4455        Projection: employee_csv.id [id:Int32]
4456          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4457            Subquery: [state:Utf8]
4458              TableScan: employee_csv projection=[state] [state:Utf8]
4459            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4460        ");
4461        Ok(())
4462    }
4463
4464    #[test]
4465    fn test_display_subquery_alias() -> Result<()> {
4466        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4467            .build()?;
4468        let plan1 = Arc::new(plan1);
4469
4470        let plan =
4471            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4472                .project(vec![col("id"), exists(plan1).alias("exists")])?
4473                .build();
4474
4475        assert_snapshot!(plan?.display_indent(), @r"
4476        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4477          Subquery:
4478            TableScan: employee_csv projection=[state]
4479          TableScan: employee_csv projection=[id, state]
4480        ");
4481        Ok(())
4482    }
4483
4484    #[test]
4485    fn test_display_graphviz() -> Result<()> {
4486        let plan = display_plan()?;
4487
4488        // just test for a few key lines in the output rather than the
4489        // whole thing to make test maintenance easier.
4490        assert_snapshot!(plan.display_graphviz(), @r#"
4491        // Begin DataFusion GraphViz Plan,
4492        // display it online here: https://dreampuf.github.io/GraphvizOnline
4493
4494        digraph {
4495          subgraph cluster_1
4496          {
4497            graph[label="LogicalPlan"]
4498            2[shape=box label="Projection: employee_csv.id"]
4499            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4500            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4501            4[shape=box label="Subquery:"]
4502            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4503            5[shape=box label="TableScan: employee_csv projection=[state]"]
4504            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4505            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4506            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4507          }
4508          subgraph cluster_7
4509          {
4510            graph[label="Detailed LogicalPlan"]
4511            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4512            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4513            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4514            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4515            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4516            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4517            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4518            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4519            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4520          }
4521        }
4522        // End DataFusion GraphViz Plan
4523        "#);
4524        Ok(())
4525    }
4526
4527    #[test]
4528    fn test_display_pg_json() -> Result<()> {
4529        let plan = display_plan()?;
4530
4531        assert_snapshot!(plan.display_pg_json(), @r#"
4532        [
4533          {
4534            "Plan": {
4535              "Expressions": [
4536                "employee_csv.id"
4537              ],
4538              "Node Type": "Projection",
4539              "Output": [
4540                "id"
4541              ],
4542              "Plans": [
4543                {
4544                  "Condition": "employee_csv.state IN (<subquery>)",
4545                  "Node Type": "Filter",
4546                  "Output": [
4547                    "id",
4548                    "state"
4549                  ],
4550                  "Plans": [
4551                    {
4552                      "Node Type": "Subquery",
4553                      "Output": [
4554                        "state"
4555                      ],
4556                      "Plans": [
4557                        {
4558                          "Node Type": "TableScan",
4559                          "Output": [
4560                            "state"
4561                          ],
4562                          "Plans": [],
4563                          "Relation Name": "employee_csv"
4564                        }
4565                      ]
4566                    },
4567                    {
4568                      "Node Type": "TableScan",
4569                      "Output": [
4570                        "id",
4571                        "state"
4572                      ],
4573                      "Plans": [],
4574                      "Relation Name": "employee_csv"
4575                    }
4576                  ]
4577                }
4578              ]
4579            }
4580          }
4581        ]
4582        "#);
4583        Ok(())
4584    }
4585
4586    /// Tests for the Visitor trait and walking logical plan nodes
4587    #[derive(Debug, Default)]
4588    struct OkVisitor {
4589        strings: Vec<String>,
4590    }
4591
4592    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4593        type Node = LogicalPlan;
4594
4595        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4596            let s = match plan {
4597                LogicalPlan::Projection { .. } => "pre_visit Projection",
4598                LogicalPlan::Filter { .. } => "pre_visit Filter",
4599                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4600                _ => {
4601                    return not_impl_err!("unknown plan type");
4602                }
4603            };
4604
4605            self.strings.push(s.into());
4606            Ok(TreeNodeRecursion::Continue)
4607        }
4608
4609        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4610            let s = match plan {
4611                LogicalPlan::Projection { .. } => "post_visit Projection",
4612                LogicalPlan::Filter { .. } => "post_visit Filter",
4613                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4614                _ => {
4615                    return not_impl_err!("unknown plan type");
4616                }
4617            };
4618
4619            self.strings.push(s.into());
4620            Ok(TreeNodeRecursion::Continue)
4621        }
4622    }
4623
4624    #[test]
4625    fn visit_order() {
4626        let mut visitor = OkVisitor::default();
4627        let plan = test_plan();
4628        let res = plan.visit_with_subqueries(&mut visitor);
4629        assert!(res.is_ok());
4630
4631        assert_debug_snapshot!(visitor.strings, @r#"
4632        [
4633            "pre_visit Projection",
4634            "pre_visit Filter",
4635            "pre_visit TableScan",
4636            "post_visit TableScan",
4637            "post_visit Filter",
4638            "post_visit Projection",
4639        ]
4640        "#);
4641    }
4642
4643    #[derive(Debug, Default)]
4644    /// Counter than counts to zero and returns true when it gets there
4645    struct OptionalCounter {
4646        val: Option<usize>,
4647    }
4648
4649    impl OptionalCounter {
4650        fn new(val: usize) -> Self {
4651            Self { val: Some(val) }
4652        }
4653        // Decrements the counter by 1, if any, returning true if it hits zero
4654        fn dec(&mut self) -> bool {
4655            if Some(0) == self.val {
4656                true
4657            } else {
4658                self.val = self.val.take().map(|i| i - 1);
4659                false
4660            }
4661        }
4662    }
4663
4664    #[derive(Debug, Default)]
4665    /// Visitor that returns false after some number of visits
4666    struct StoppingVisitor {
4667        inner: OkVisitor,
4668        /// When Some(0) returns false from pre_visit
4669        return_false_from_pre_in: OptionalCounter,
4670        /// When Some(0) returns false from post_visit
4671        return_false_from_post_in: OptionalCounter,
4672    }
4673
4674    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4675        type Node = LogicalPlan;
4676
4677        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4678            if self.return_false_from_pre_in.dec() {
4679                return Ok(TreeNodeRecursion::Stop);
4680            }
4681            self.inner.f_down(plan)?;
4682
4683            Ok(TreeNodeRecursion::Continue)
4684        }
4685
4686        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4687            if self.return_false_from_post_in.dec() {
4688                return Ok(TreeNodeRecursion::Stop);
4689            }
4690
4691            self.inner.f_up(plan)
4692        }
4693    }
4694
4695    /// test early stopping in pre-visit
4696    #[test]
4697    fn early_stopping_pre_visit() {
4698        let mut visitor = StoppingVisitor {
4699            return_false_from_pre_in: OptionalCounter::new(2),
4700            ..Default::default()
4701        };
4702        let plan = test_plan();
4703        let res = plan.visit_with_subqueries(&mut visitor);
4704        assert!(res.is_ok());
4705
4706        assert_debug_snapshot!(
4707            visitor.inner.strings,
4708            @r#"
4709        [
4710            "pre_visit Projection",
4711            "pre_visit Filter",
4712        ]
4713        "#
4714        );
4715    }
4716
4717    #[test]
4718    fn early_stopping_post_visit() {
4719        let mut visitor = StoppingVisitor {
4720            return_false_from_post_in: OptionalCounter::new(1),
4721            ..Default::default()
4722        };
4723        let plan = test_plan();
4724        let res = plan.visit_with_subqueries(&mut visitor);
4725        assert!(res.is_ok());
4726
4727        assert_debug_snapshot!(
4728            visitor.inner.strings,
4729            @r#"
4730        [
4731            "pre_visit Projection",
4732            "pre_visit Filter",
4733            "pre_visit TableScan",
4734            "post_visit TableScan",
4735        ]
4736        "#
4737        );
4738    }
4739
4740    #[derive(Debug, Default)]
4741    /// Visitor that returns an error after some number of visits
4742    struct ErrorVisitor {
4743        inner: OkVisitor,
4744        /// When Some(0) returns false from pre_visit
4745        return_error_from_pre_in: OptionalCounter,
4746        /// When Some(0) returns false from post_visit
4747        return_error_from_post_in: OptionalCounter,
4748    }
4749
4750    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4751        type Node = LogicalPlan;
4752
4753        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4754            if self.return_error_from_pre_in.dec() {
4755                return not_impl_err!("Error in pre_visit");
4756            }
4757
4758            self.inner.f_down(plan)
4759        }
4760
4761        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4762            if self.return_error_from_post_in.dec() {
4763                return not_impl_err!("Error in post_visit");
4764            }
4765
4766            self.inner.f_up(plan)
4767        }
4768    }
4769
4770    #[test]
4771    fn error_pre_visit() {
4772        let mut visitor = ErrorVisitor {
4773            return_error_from_pre_in: OptionalCounter::new(2),
4774            ..Default::default()
4775        };
4776        let plan = test_plan();
4777        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4778        assert_snapshot!(
4779            res.strip_backtrace(),
4780            @"This feature is not implemented: Error in pre_visit"
4781        );
4782        assert_debug_snapshot!(
4783            visitor.inner.strings,
4784            @r#"
4785        [
4786            "pre_visit Projection",
4787            "pre_visit Filter",
4788        ]
4789        "#
4790        );
4791    }
4792
4793    #[test]
4794    fn error_post_visit() {
4795        let mut visitor = ErrorVisitor {
4796            return_error_from_post_in: OptionalCounter::new(1),
4797            ..Default::default()
4798        };
4799        let plan = test_plan();
4800        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4801        assert_snapshot!(
4802            res.strip_backtrace(),
4803            @"This feature is not implemented: Error in post_visit"
4804        );
4805        assert_debug_snapshot!(
4806            visitor.inner.strings,
4807            @r#"
4808        [
4809            "pre_visit Projection",
4810            "pre_visit Filter",
4811            "pre_visit TableScan",
4812            "post_visit TableScan",
4813        ]
4814        "#
4815        );
4816    }
4817
4818    #[test]
4819    fn test_partial_eq_hash_and_partial_ord() {
4820        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4821            produce_one_row: true,
4822            schema: Arc::new(DFSchema::empty()),
4823        }));
4824
4825        let count_window_function = |schema| {
4826            Window::try_new_with_schema(
4827                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4828                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4829                    vec![],
4830                )))],
4831                Arc::clone(&empty_values),
4832                Arc::new(schema),
4833            )
4834            .unwrap()
4835        };
4836
4837        let schema_without_metadata = || {
4838            DFSchema::from_unqualified_fields(
4839                vec![Field::new("count", DataType::Int64, false)].into(),
4840                HashMap::new(),
4841            )
4842            .unwrap()
4843        };
4844
4845        let schema_with_metadata = || {
4846            DFSchema::from_unqualified_fields(
4847                vec![Field::new("count", DataType::Int64, false)].into(),
4848                [("key".to_string(), "value".to_string())].into(),
4849            )
4850            .unwrap()
4851        };
4852
4853        // A Window
4854        let f = count_window_function(schema_without_metadata());
4855
4856        // Same like `f`, different instance
4857        let f2 = count_window_function(schema_without_metadata());
4858        assert_eq!(f, f2);
4859        assert_eq!(hash(&f), hash(&f2));
4860        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4861
4862        // Same like `f`, except for schema metadata
4863        let o = count_window_function(schema_with_metadata());
4864        assert_ne!(f, o);
4865        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4866        assert_eq!(f.partial_cmp(&o), None);
4867    }
4868
4869    fn hash<T: Hash>(value: &T) -> u64 {
4870        let hasher = &mut DefaultHasher::new();
4871        value.hash(hasher);
4872        hasher.finish()
4873    }
4874
4875    #[test]
4876    fn projection_expr_schema_mismatch() -> Result<()> {
4877        let empty_schema = Arc::new(DFSchema::empty());
4878        let p = Projection::try_new_with_schema(
4879            vec![col("a")],
4880            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4881                produce_one_row: false,
4882                schema: Arc::clone(&empty_schema),
4883            })),
4884            empty_schema,
4885        );
4886        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4887        Ok(())
4888    }
4889
4890    fn test_plan() -> LogicalPlan {
4891        let schema = Schema::new(vec![
4892            Field::new("id", DataType::Int32, false),
4893            Field::new("state", DataType::Utf8, false),
4894        ]);
4895
4896        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4897            .unwrap()
4898            .filter(col("state").eq(lit("CO")))
4899            .unwrap()
4900            .project(vec![col("id")])
4901            .unwrap()
4902            .build()
4903            .unwrap()
4904    }
4905
4906    #[test]
4907    fn test_replace_invalid_placeholder() {
4908        // test empty placeholder
4909        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4910
4911        let plan = table_scan(TableReference::none(), &schema, None)
4912            .unwrap()
4913            .filter(col("id").eq(placeholder("")))
4914            .unwrap()
4915            .build()
4916            .unwrap();
4917
4918        let param_values = vec![ScalarValue::Int32(Some(42))];
4919        plan.replace_params_with_values(&param_values.clone().into())
4920            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4921
4922        // test $0 placeholder
4923        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4924
4925        let plan = table_scan(TableReference::none(), &schema, None)
4926            .unwrap()
4927            .filter(col("id").eq(placeholder("$0")))
4928            .unwrap()
4929            .build()
4930            .unwrap();
4931
4932        plan.replace_params_with_values(&param_values.clone().into())
4933            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4934
4935        // test $00 placeholder
4936        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4937
4938        let plan = table_scan(TableReference::none(), &schema, None)
4939            .unwrap()
4940            .filter(col("id").eq(placeholder("$00")))
4941            .unwrap()
4942            .build()
4943            .unwrap();
4944
4945        plan.replace_params_with_values(&param_values.into())
4946            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4947    }
4948
4949    #[test]
4950    fn test_replace_placeholder_mismatched_metadata() {
4951        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4952
4953        // Create a prepared statement with explicit fields that do not have metadata
4954        let plan = table_scan(TableReference::none(), &schema, None)
4955            .unwrap()
4956            .filter(col("id").eq(placeholder("$1")))
4957            .unwrap()
4958            .build()
4959            .unwrap();
4960        let prepared_builder = LogicalPlanBuilder::new(plan)
4961            .prepare(
4962                "".to_string(),
4963                vec![Field::new("", DataType::Int32, true).into()],
4964            )
4965            .unwrap();
4966
4967        // Attempt to bind a parameter with metadata
4968        let mut scalar_meta = HashMap::new();
4969        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4970        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4971            ScalarValue::Int32(Some(42)),
4972            Some(scalar_meta.into()),
4973        )]);
4974        prepared_builder
4975            .plan()
4976            .clone()
4977            .with_param_values(param_values)
4978            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4979    }
4980
4981    #[test]
4982    fn test_replace_placeholder_empty_relation_valid_schema() {
4983        // SELECT $1, $2;
4984        let plan = LogicalPlanBuilder::empty(false)
4985            .project(vec![
4986                SelectExpr::from(placeholder("$1")),
4987                SelectExpr::from(placeholder("$2")),
4988            ])
4989            .unwrap()
4990            .build()
4991            .unwrap();
4992
4993        // original
4994        assert_snapshot!(plan.display_indent_schema(), @r"
4995        Projection: $1, $2 [$1:Null;N, $2:Null;N]
4996          EmptyRelation: rows=0 []
4997        ");
4998
4999        let plan = plan
5000            .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
5001            .unwrap();
5002
5003        // replaced
5004        assert_snapshot!(plan.display_indent_schema(), @r#"
5005        Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
5006          EmptyRelation: rows=0 []
5007        "#);
5008    }
5009
5010    #[test]
5011    fn test_nullable_schema_after_grouping_set() {
5012        let schema = Schema::new(vec![
5013            Field::new("foo", DataType::Int32, false),
5014            Field::new("bar", DataType::Int32, false),
5015        ]);
5016
5017        let plan = table_scan(TableReference::none(), &schema, None)
5018            .unwrap()
5019            .aggregate(
5020                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5021                    vec![col("foo")],
5022                    vec![col("bar")],
5023                ]))],
5024                vec![count(lit(true))],
5025            )
5026            .unwrap()
5027            .build()
5028            .unwrap();
5029
5030        let output_schema = plan.schema();
5031
5032        assert!(
5033            output_schema
5034                .field_with_name(None, "foo")
5035                .unwrap()
5036                .is_nullable(),
5037        );
5038        assert!(
5039            output_schema
5040                .field_with_name(None, "bar")
5041                .unwrap()
5042                .is_nullable()
5043        );
5044    }
5045
5046    #[test]
5047    fn test_filter_is_scalar() {
5048        // test empty placeholder
5049        let schema =
5050            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5051
5052        let source = Arc::new(LogicalTableSource::new(schema));
5053        let schema = Arc::new(
5054            DFSchema::try_from_qualified_schema(
5055                TableReference::bare("tab"),
5056                &source.schema(),
5057            )
5058            .unwrap(),
5059        );
5060        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5061            table_name: TableReference::bare("tab"),
5062            source: Arc::clone(&source) as Arc<dyn TableSource>,
5063            projection: None,
5064            projected_schema: Arc::clone(&schema),
5065            filters: vec![],
5066            fetch: None,
5067        }));
5068        let col = schema.field_names()[0].clone();
5069
5070        let filter = Filter::try_new(
5071            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5072            scan,
5073        )
5074        .unwrap();
5075        assert!(!filter.is_scalar());
5076        let unique_schema = Arc::new(
5077            schema
5078                .as_ref()
5079                .clone()
5080                .with_functional_dependencies(
5081                    FunctionalDependencies::new_from_constraints(
5082                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
5083                            vec![0],
5084                        )])),
5085                        1,
5086                    ),
5087                )
5088                .unwrap(),
5089        );
5090        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5091            table_name: TableReference::bare("tab"),
5092            source,
5093            projection: None,
5094            projected_schema: Arc::clone(&unique_schema),
5095            filters: vec![],
5096            fetch: None,
5097        }));
5098        let col = schema.field_names()[0].clone();
5099
5100        let filter =
5101            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5102        assert!(filter.is_scalar());
5103    }
5104
5105    #[test]
5106    fn test_transform_explain() {
5107        let schema = Schema::new(vec![
5108            Field::new("foo", DataType::Int32, false),
5109            Field::new("bar", DataType::Int32, false),
5110        ]);
5111
5112        let plan = table_scan(TableReference::none(), &schema, None)
5113            .unwrap()
5114            .explain(false, false)
5115            .unwrap()
5116            .build()
5117            .unwrap();
5118
5119        let external_filter = col("foo").eq(lit(true));
5120
5121        // after transformation, because plan is not the same anymore,
5122        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
5123        let plan = plan
5124            .transform(|plan| match plan {
5125                LogicalPlan::TableScan(table) => {
5126                    let filter = Filter::try_new(
5127                        external_filter.clone(),
5128                        Arc::new(LogicalPlan::TableScan(table)),
5129                    )
5130                    .unwrap();
5131                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5132                }
5133                x => Ok(Transformed::no(x)),
5134            })
5135            .data()
5136            .unwrap();
5137
5138        let actual = format!("{}", plan.display_indent());
5139        assert_snapshot!(actual, @r"
5140        Explain
5141          Filter: foo = Boolean(true)
5142            TableScan: ?table?
5143        ")
5144    }
5145
5146    #[test]
5147    fn test_plan_partial_ord() {
5148        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5149            produce_one_row: false,
5150            schema: Arc::new(DFSchema::empty()),
5151        });
5152
5153        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5154            schema: Arc::new(Schema::new(vec![Field::new(
5155                "foo",
5156                DataType::Int32,
5157                false,
5158            )])),
5159            output_schema: DFSchemaRef::new(DFSchema::empty()),
5160        });
5161
5162        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5163            schema: Arc::new(Schema::new(vec![Field::new(
5164                "foo",
5165                DataType::Int32,
5166                false,
5167            )])),
5168            output_schema: DFSchemaRef::new(DFSchema::empty()),
5169        });
5170
5171        assert_eq!(
5172            empty_relation.partial_cmp(&describe_table),
5173            Some(Ordering::Less)
5174        );
5175        assert_eq!(
5176            describe_table.partial_cmp(&empty_relation),
5177            Some(Ordering::Greater)
5178        );
5179        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5180    }
5181
5182    #[test]
5183    fn test_limit_with_new_children() {
5184        let input = Arc::new(LogicalPlan::Values(Values {
5185            schema: Arc::new(DFSchema::empty()),
5186            values: vec![vec![]],
5187        }));
5188        let cases = [
5189            LogicalPlan::Limit(Limit {
5190                skip: None,
5191                fetch: None,
5192                input: Arc::clone(&input),
5193            }),
5194            LogicalPlan::Limit(Limit {
5195                skip: None,
5196                fetch: Some(Box::new(Expr::Literal(
5197                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5198                    None,
5199                ))),
5200                input: Arc::clone(&input),
5201            }),
5202            LogicalPlan::Limit(Limit {
5203                skip: Some(Box::new(Expr::Literal(
5204                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5205                    None,
5206                ))),
5207                fetch: None,
5208                input: Arc::clone(&input),
5209            }),
5210            LogicalPlan::Limit(Limit {
5211                skip: Some(Box::new(Expr::Literal(
5212                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5213                    None,
5214                ))),
5215                fetch: Some(Box::new(Expr::Literal(
5216                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5217                    None,
5218                ))),
5219                input,
5220            }),
5221        ];
5222
5223        for limit in cases {
5224            let new_limit = limit
5225                .with_new_exprs(
5226                    limit.expressions(),
5227                    limit.inputs().into_iter().cloned().collect(),
5228                )
5229                .unwrap();
5230            assert_eq!(limit, new_limit);
5231        }
5232    }
5233
5234    #[test]
5235    fn test_with_subqueries_jump() {
5236        // The test plan contains a `Project` node above a `Filter` node, and the
5237        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5238        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5239        // `Filter`s.
5240        let subquery_schema =
5241            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5242
5243        let subquery_plan =
5244            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5245                .unwrap()
5246                .filter(col("sub_id").eq(lit(0)))
5247                .unwrap()
5248                .build()
5249                .unwrap();
5250
5251        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5252
5253        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5254            .unwrap()
5255            .filter(col("id").eq(lit(0)))
5256            .unwrap()
5257            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5258            .unwrap()
5259            .build()
5260            .unwrap();
5261
5262        let mut filter_found = false;
5263        plan.apply_with_subqueries(|plan| {
5264            match plan {
5265                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5266                LogicalPlan::Filter(..) => filter_found = true,
5267                _ => {}
5268            }
5269            Ok(TreeNodeRecursion::Continue)
5270        })
5271        .unwrap();
5272        assert!(!filter_found);
5273
5274        struct ProjectJumpVisitor {
5275            filter_found: bool,
5276        }
5277
5278        impl ProjectJumpVisitor {
5279            fn new() -> Self {
5280                Self {
5281                    filter_found: false,
5282                }
5283            }
5284        }
5285
5286        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5287            type Node = LogicalPlan;
5288
5289            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5290                match node {
5291                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5292                    LogicalPlan::Filter(..) => self.filter_found = true,
5293                    _ => {}
5294                }
5295                Ok(TreeNodeRecursion::Continue)
5296            }
5297        }
5298
5299        let mut visitor = ProjectJumpVisitor::new();
5300        plan.visit_with_subqueries(&mut visitor).unwrap();
5301        assert!(!visitor.filter_found);
5302
5303        let mut filter_found = false;
5304        plan.clone()
5305            .transform_down_with_subqueries(|plan| {
5306                match plan {
5307                    LogicalPlan::Projection(..) => {
5308                        return Ok(Transformed::new(
5309                            plan,
5310                            false,
5311                            TreeNodeRecursion::Jump,
5312                        ));
5313                    }
5314                    LogicalPlan::Filter(..) => filter_found = true,
5315                    _ => {}
5316                }
5317                Ok(Transformed::no(plan))
5318            })
5319            .unwrap();
5320        assert!(!filter_found);
5321
5322        let mut filter_found = false;
5323        plan.clone()
5324            .transform_down_up_with_subqueries(
5325                |plan| {
5326                    match plan {
5327                        LogicalPlan::Projection(..) => {
5328                            return Ok(Transformed::new(
5329                                plan,
5330                                false,
5331                                TreeNodeRecursion::Jump,
5332                            ));
5333                        }
5334                        LogicalPlan::Filter(..) => filter_found = true,
5335                        _ => {}
5336                    }
5337                    Ok(Transformed::no(plan))
5338                },
5339                |plan| Ok(Transformed::no(plan)),
5340            )
5341            .unwrap();
5342        assert!(!filter_found);
5343
5344        struct ProjectJumpRewriter {
5345            filter_found: bool,
5346        }
5347
5348        impl ProjectJumpRewriter {
5349            fn new() -> Self {
5350                Self {
5351                    filter_found: false,
5352                }
5353            }
5354        }
5355
5356        impl TreeNodeRewriter for ProjectJumpRewriter {
5357            type Node = LogicalPlan;
5358
5359            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5360                match node {
5361                    LogicalPlan::Projection(..) => {
5362                        return Ok(Transformed::new(
5363                            node,
5364                            false,
5365                            TreeNodeRecursion::Jump,
5366                        ));
5367                    }
5368                    LogicalPlan::Filter(..) => self.filter_found = true,
5369                    _ => {}
5370                }
5371                Ok(Transformed::no(node))
5372            }
5373        }
5374
5375        let mut rewriter = ProjectJumpRewriter::new();
5376        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5377        assert!(!rewriter.filter_found);
5378    }
5379
5380    #[test]
5381    fn test_with_unresolved_placeholders() {
5382        let field_name = "id";
5383        let placeholder_value = "$1";
5384        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5385
5386        let plan = table_scan(TableReference::none(), &schema, None)
5387            .unwrap()
5388            .filter(col(field_name).eq(placeholder(placeholder_value)))
5389            .unwrap()
5390            .build()
5391            .unwrap();
5392
5393        // Check that the placeholder parameters have not received a DataType.
5394        let params = plan.get_parameter_fields().unwrap();
5395        assert_eq!(params.len(), 1);
5396
5397        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5398        assert_eq!(parameter_type, None);
5399    }
5400
5401    #[test]
5402    fn test_join_with_new_exprs() -> Result<()> {
5403        fn create_test_join(
5404            on: Vec<(Expr, Expr)>,
5405            filter: Option<Expr>,
5406        ) -> Result<LogicalPlan> {
5407            let schema = Schema::new(vec![
5408                Field::new("a", DataType::Int32, false),
5409                Field::new("b", DataType::Int32, false),
5410            ]);
5411
5412            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5413            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5414
5415            Ok(LogicalPlan::Join(Join {
5416                left: Arc::new(
5417                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5418                ),
5419                right: Arc::new(
5420                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5421                ),
5422                on,
5423                filter,
5424                join_type: JoinType::Inner,
5425                join_constraint: JoinConstraint::On,
5426                schema: Arc::new(left_schema.join(&right_schema)?),
5427                null_equality: NullEquality::NullEqualsNothing,
5428                null_aware: false,
5429            }))
5430        }
5431
5432        {
5433            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5434            let LogicalPlan::Join(join) = join.with_new_exprs(
5435                join.expressions(),
5436                join.inputs().into_iter().cloned().collect(),
5437            )?
5438            else {
5439                unreachable!()
5440            };
5441            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5442            assert_eq!(join.filter, None);
5443        }
5444
5445        {
5446            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5447            let LogicalPlan::Join(join) = join.with_new_exprs(
5448                join.expressions(),
5449                join.inputs().into_iter().cloned().collect(),
5450            )?
5451            else {
5452                unreachable!()
5453            };
5454            assert_eq!(join.on, vec![]);
5455            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5456        }
5457
5458        {
5459            let join = create_test_join(
5460                vec![(col("t1.a"), (col("t2.a")))],
5461                Some(col("t1.b").gt(col("t2.b"))),
5462            )?;
5463            let LogicalPlan::Join(join) = join.with_new_exprs(
5464                join.expressions(),
5465                join.inputs().into_iter().cloned().collect(),
5466            )?
5467            else {
5468                unreachable!()
5469            };
5470            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5471            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5472        }
5473
5474        {
5475            let join = create_test_join(
5476                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5477                None,
5478            )?;
5479            let LogicalPlan::Join(join) = join.with_new_exprs(
5480                vec![
5481                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5482                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5483                    col("t1.b"),
5484                    col("t2.b"),
5485                    lit(true),
5486                ],
5487                join.inputs().into_iter().cloned().collect(),
5488            )?
5489            else {
5490                unreachable!()
5491            };
5492            assert_eq!(
5493                join.on,
5494                vec![
5495                    (
5496                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5497                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5498                    ),
5499                    (col("t1.b"), (col("t2.b")))
5500                ]
5501            );
5502            assert_eq!(join.filter, Some(lit(true)));
5503        }
5504
5505        Ok(())
5506    }
5507
5508    #[test]
5509    fn test_join_try_new() -> Result<()> {
5510        let schema = Schema::new(vec![
5511            Field::new("a", DataType::Int32, false),
5512            Field::new("b", DataType::Int32, false),
5513        ]);
5514
5515        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5516
5517        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5518
5519        let join_types = vec![
5520            JoinType::Inner,
5521            JoinType::Left,
5522            JoinType::Right,
5523            JoinType::Full,
5524            JoinType::LeftSemi,
5525            JoinType::LeftAnti,
5526            JoinType::RightSemi,
5527            JoinType::RightAnti,
5528            JoinType::LeftMark,
5529        ];
5530
5531        for join_type in join_types {
5532            let join = Join::try_new(
5533                Arc::new(left_scan.clone()),
5534                Arc::new(right_scan.clone()),
5535                vec![(col("t1.a"), col("t2.a"))],
5536                Some(col("t1.b").gt(col("t2.b"))),
5537                join_type,
5538                JoinConstraint::On,
5539                NullEquality::NullEqualsNothing,
5540                false,
5541            )?;
5542
5543            match join_type {
5544                JoinType::LeftSemi | JoinType::LeftAnti => {
5545                    assert_eq!(join.schema.fields().len(), 2);
5546
5547                    let fields = join.schema.fields();
5548                    assert_eq!(
5549                        fields[0].name(),
5550                        "a",
5551                        "First field should be 'a' from left table"
5552                    );
5553                    assert_eq!(
5554                        fields[1].name(),
5555                        "b",
5556                        "Second field should be 'b' from left table"
5557                    );
5558                }
5559                JoinType::RightSemi | JoinType::RightAnti => {
5560                    assert_eq!(join.schema.fields().len(), 2);
5561
5562                    let fields = join.schema.fields();
5563                    assert_eq!(
5564                        fields[0].name(),
5565                        "a",
5566                        "First field should be 'a' from right table"
5567                    );
5568                    assert_eq!(
5569                        fields[1].name(),
5570                        "b",
5571                        "Second field should be 'b' from right table"
5572                    );
5573                }
5574                JoinType::LeftMark => {
5575                    assert_eq!(join.schema.fields().len(), 3);
5576
5577                    let fields = join.schema.fields();
5578                    assert_eq!(
5579                        fields[0].name(),
5580                        "a",
5581                        "First field should be 'a' from left table"
5582                    );
5583                    assert_eq!(
5584                        fields[1].name(),
5585                        "b",
5586                        "Second field should be 'b' from left table"
5587                    );
5588                    assert_eq!(
5589                        fields[2].name(),
5590                        "mark",
5591                        "Third field should be the mark column"
5592                    );
5593
5594                    assert!(!fields[0].is_nullable());
5595                    assert!(!fields[1].is_nullable());
5596                    assert!(!fields[2].is_nullable());
5597                }
5598                _ => {
5599                    assert_eq!(join.schema.fields().len(), 4);
5600
5601                    let fields = join.schema.fields();
5602                    assert_eq!(
5603                        fields[0].name(),
5604                        "a",
5605                        "First field should be 'a' from left table"
5606                    );
5607                    assert_eq!(
5608                        fields[1].name(),
5609                        "b",
5610                        "Second field should be 'b' from left table"
5611                    );
5612                    assert_eq!(
5613                        fields[2].name(),
5614                        "a",
5615                        "Third field should be 'a' from right table"
5616                    );
5617                    assert_eq!(
5618                        fields[3].name(),
5619                        "b",
5620                        "Fourth field should be 'b' from right table"
5621                    );
5622
5623                    if join_type == JoinType::Left {
5624                        // Left side fields (first two) shouldn't be nullable
5625                        assert!(!fields[0].is_nullable());
5626                        assert!(!fields[1].is_nullable());
5627                        // Right side fields (third and fourth) should be nullable
5628                        assert!(fields[2].is_nullable());
5629                        assert!(fields[3].is_nullable());
5630                    } else if join_type == JoinType::Right {
5631                        // Left side fields (first two) should be nullable
5632                        assert!(fields[0].is_nullable());
5633                        assert!(fields[1].is_nullable());
5634                        // Right side fields (third and fourth) shouldn't be nullable
5635                        assert!(!fields[2].is_nullable());
5636                        assert!(!fields[3].is_nullable());
5637                    } else if join_type == JoinType::Full {
5638                        assert!(fields[0].is_nullable());
5639                        assert!(fields[1].is_nullable());
5640                        assert!(fields[2].is_nullable());
5641                        assert!(fields[3].is_nullable());
5642                    }
5643                }
5644            }
5645
5646            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5647            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5648            assert_eq!(join.join_type, join_type);
5649            assert_eq!(join.join_constraint, JoinConstraint::On);
5650            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5651        }
5652
5653        Ok(())
5654    }
5655
5656    #[test]
5657    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5658        let left_schema = Schema::new(vec![
5659            Field::new("id", DataType::Int32, false), // Common column in both tables
5660            Field::new("name", DataType::Utf8, false), // Unique to left
5661            Field::new("value", DataType::Int32, false), // Common column, different meaning
5662        ]);
5663
5664        let right_schema = Schema::new(vec![
5665            Field::new("id", DataType::Int32, false), // Common column in both tables
5666            Field::new("category", DataType::Utf8, false), // Unique to right
5667            Field::new("value", DataType::Float64, true), // Common column, different meaning
5668        ]);
5669
5670        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5671
5672        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5673
5674        // Test 1: USING constraint with a common column
5675        {
5676            // In the logical plan, both copies of the `id` column are preserved
5677            // The USING constraint is handled later during physical execution, where the common column appears once
5678            let join = Join::try_new(
5679                Arc::new(left_plan.clone()),
5680                Arc::new(right_plan.clone()),
5681                vec![(col("t1.id"), col("t2.id"))],
5682                None,
5683                JoinType::Inner,
5684                JoinConstraint::Using,
5685                NullEquality::NullEqualsNothing,
5686                false,
5687            )?;
5688
5689            let fields = join.schema.fields();
5690
5691            assert_eq!(fields.len(), 6);
5692
5693            assert_eq!(
5694                fields[0].name(),
5695                "id",
5696                "First field should be 'id' from left table"
5697            );
5698            assert_eq!(
5699                fields[1].name(),
5700                "name",
5701                "Second field should be 'name' from left table"
5702            );
5703            assert_eq!(
5704                fields[2].name(),
5705                "value",
5706                "Third field should be 'value' from left table"
5707            );
5708            assert_eq!(
5709                fields[3].name(),
5710                "id",
5711                "Fourth field should be 'id' from right table"
5712            );
5713            assert_eq!(
5714                fields[4].name(),
5715                "category",
5716                "Fifth field should be 'category' from right table"
5717            );
5718            assert_eq!(
5719                fields[5].name(),
5720                "value",
5721                "Sixth field should be 'value' from right table"
5722            );
5723
5724            assert_eq!(join.join_constraint, JoinConstraint::Using);
5725        }
5726
5727        // Test 2: Complex join condition with expressions
5728        {
5729            // Complex condition: join on id equality AND where left.value < right.value
5730            let join = Join::try_new(
5731                Arc::new(left_plan.clone()),
5732                Arc::new(right_plan.clone()),
5733                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5734                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5735                JoinType::Inner,
5736                JoinConstraint::On,
5737                NullEquality::NullEqualsNothing,
5738                false,
5739            )?;
5740
5741            let fields = join.schema.fields();
5742            assert_eq!(fields.len(), 6);
5743
5744            assert_eq!(
5745                fields[0].name(),
5746                "id",
5747                "First field should be 'id' from left table"
5748            );
5749            assert_eq!(
5750                fields[1].name(),
5751                "name",
5752                "Second field should be 'name' from left table"
5753            );
5754            assert_eq!(
5755                fields[2].name(),
5756                "value",
5757                "Third field should be 'value' from left table"
5758            );
5759            assert_eq!(
5760                fields[3].name(),
5761                "id",
5762                "Fourth field should be 'id' from right table"
5763            );
5764            assert_eq!(
5765                fields[4].name(),
5766                "category",
5767                "Fifth field should be 'category' from right table"
5768            );
5769            assert_eq!(
5770                fields[5].name(),
5771                "value",
5772                "Sixth field should be 'value' from right table"
5773            );
5774
5775            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5776        }
5777
5778        // Test 3: Join with null equality behavior set to true
5779        {
5780            let join = Join::try_new(
5781                Arc::new(left_plan.clone()),
5782                Arc::new(right_plan.clone()),
5783                vec![(col("t1.id"), col("t2.id"))],
5784                None,
5785                JoinType::Inner,
5786                JoinConstraint::On,
5787                NullEquality::NullEqualsNull,
5788                false,
5789            )?;
5790
5791            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5792        }
5793
5794        Ok(())
5795    }
5796
5797    #[test]
5798    fn test_join_try_new_schema_validation() -> Result<()> {
5799        let left_schema = Schema::new(vec![
5800            Field::new("id", DataType::Int32, false),
5801            Field::new("name", DataType::Utf8, false),
5802            Field::new("value", DataType::Float64, true),
5803        ]);
5804
5805        let right_schema = Schema::new(vec![
5806            Field::new("id", DataType::Int32, false),
5807            Field::new("category", DataType::Utf8, true),
5808            Field::new("code", DataType::Int16, false),
5809        ]);
5810
5811        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5812
5813        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5814
5815        let join_types = vec![
5816            JoinType::Inner,
5817            JoinType::Left,
5818            JoinType::Right,
5819            JoinType::Full,
5820        ];
5821
5822        for join_type in join_types {
5823            let join = Join::try_new(
5824                Arc::new(left_plan.clone()),
5825                Arc::new(right_plan.clone()),
5826                vec![(col("t1.id"), col("t2.id"))],
5827                Some(col("t1.value").gt(lit(5.0))),
5828                join_type,
5829                JoinConstraint::On,
5830                NullEquality::NullEqualsNothing,
5831                false,
5832            )?;
5833
5834            let fields = join.schema.fields();
5835            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5836
5837            for (i, field) in fields.iter().enumerate() {
5838                let expected_nullable = match (i, &join_type) {
5839                    // Left table fields (indices 0, 1, 2)
5840                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5841                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5842                    (2, _) => true, // value is already nullable
5843
5844                    // Right table fields (indices 3, 4, 5)
5845                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5846                    (4, _) => true, // category is already nullable
5847                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5848
5849                    _ => false,
5850                };
5851
5852                assert_eq!(
5853                    field.is_nullable(),
5854                    expected_nullable,
5855                    "Field {} ({}) nullability incorrect for {:?} join",
5856                    i,
5857                    field.name(),
5858                    join_type
5859                );
5860            }
5861        }
5862
5863        let using_join = Join::try_new(
5864            Arc::new(left_plan.clone()),
5865            Arc::new(right_plan.clone()),
5866            vec![(col("t1.id"), col("t2.id"))],
5867            None,
5868            JoinType::Inner,
5869            JoinConstraint::Using,
5870            NullEquality::NullEqualsNothing,
5871            false,
5872        )?;
5873
5874        assert_eq!(
5875            using_join.schema.fields().len(),
5876            6,
5877            "USING join should have all fields"
5878        );
5879        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5880
5881        Ok(())
5882    }
5883}