datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29    InvariantLevel, assert_always_invariants_at_current_node,
30    assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35    intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38    NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48    BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49    LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50    WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62    FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63    ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207    /// Evaluates an arbitrary list of expressions (essentially a
208    /// SELECT with an expression list) on its input.
209    Projection(Projection),
210    /// Filters rows from its input that do not match an
211    /// expression (essentially a WHERE clause with a predicate
212    /// expression).
213    ///
214    /// Semantically, `<predicate>` is evaluated for each row of the
215    /// input; If the value of `<predicate>` is true, the input row is
216    /// passed to the output. If the value of `<predicate>` is false
217    /// (or null), the row is discarded.
218    Filter(Filter),
219    /// Windows input based on a set of window spec and window
220    /// function (e.g. SUM or RANK).  This is used to implement SQL
221    /// window functions, and the `OVER` clause.
222    ///
223    /// See [`Window`] for more details
224    Window(Window),
225    /// Aggregates its input based on a set of grouping and aggregate
226    /// expressions (e.g. SUM). This is used to implement SQL aggregates
227    /// and `GROUP BY`.
228    ///
229    /// See [`Aggregate`] for more details
230    Aggregate(Aggregate),
231    /// Sorts its input according to a list of sort expressions. This
232    /// is used to implement SQL `ORDER BY`
233    Sort(Sort),
234    /// Join two logical plans on one or more join columns.
235    /// This is used to implement SQL `JOIN`
236    Join(Join),
237    /// Repartitions the input based on a partitioning scheme. This is
238    /// used to add parallelism and is sometimes referred to as an
239    /// "exchange" operator in other systems
240    Repartition(Repartition),
241    /// Union multiple inputs with the same schema into a single
242    /// output stream. This is used to implement SQL `UNION [ALL]` and
243    /// `INTERSECT [ALL]`.
244    Union(Union),
245    /// Produces rows from a [`TableSource`], used to implement SQL
246    /// `FROM` tables or views.
247    TableScan(TableScan),
248    /// Produces no rows: An empty relation with an empty schema that
249    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
250    /// that has no values in the `FROM` clause.
251    EmptyRelation(EmptyRelation),
252    /// Produces the output of running another query.  This is used to
253    /// implement SQL subqueries
254    Subquery(Subquery),
255    /// Aliased relation provides, or changes, the name of a relation.
256    SubqueryAlias(SubqueryAlias),
257    /// Skip some number of rows, and then fetch some number of rows.
258    Limit(Limit),
259    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
260    Statement(Statement),
261    /// Values expression. See
262    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
263    /// documentation for more details. This is used to implement SQL such as
264    /// `VALUES (1, 2), (3, 4)`
265    Values(Values),
266    /// Produces a relation with string representations of
267    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
268    Explain(Explain),
269    /// Runs the input, and prints annotated physical plan as a string
270    /// with execution metric. This is used to implement SQL
271    /// `EXPLAIN ANALYZE`.
272    Analyze(Analyze),
273    /// Extension operator defined outside of DataFusion. This is used
274    /// to extend DataFusion with custom relational operations that
275    Extension(Extension),
276    /// Remove duplicate rows from the input. This is used to
277    /// implement SQL `SELECT DISTINCT ...`.
278    Distinct(Distinct),
279    /// Data Manipulation Language (DML): Insert / Update / Delete
280    Dml(DmlStatement),
281    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
282    Ddl(DdlStatement),
283    /// `COPY TO` for writing plan results to files
284    Copy(CopyTo),
285    /// Describe the schema of the table. This is used to implement the
286    /// SQL `DESCRIBE` command from MySQL.
287    DescribeTable(DescribeTable),
288    /// Unnest a column that contains a nested list type such as an
289    /// ARRAY. This is used to implement SQL `UNNEST`
290    Unnest(Unnest),
291    /// A variadic query (e.g. "Recursive CTEs")
292    RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296    fn default() -> Self {
297        LogicalPlan::EmptyRelation(EmptyRelation {
298            produce_one_row: false,
299            schema: Arc::new(DFSchema::empty()),
300        })
301    }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306        &'a self,
307        mut f: F,
308    ) -> Result<TreeNodeRecursion> {
309        f(self)
310    }
311
312    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313        self,
314        mut f: F,
315    ) -> Result<Transformed<Self>> {
316        f(self)
317    }
318}
319
320impl LogicalPlan {
321    /// Get a reference to the logical plan's schema
322    pub fn schema(&self) -> &DFSchemaRef {
323        match self {
324            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325            LogicalPlan::Values(Values { schema, .. }) => schema,
326            LogicalPlan::TableScan(TableScan {
327                projected_schema, ..
328            }) => projected_schema,
329            LogicalPlan::Projection(Projection { schema, .. }) => schema,
330            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333            LogicalPlan::Window(Window { schema, .. }) => schema,
334            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336            LogicalPlan::Join(Join { schema, .. }) => schema,
337            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339            LogicalPlan::Statement(statement) => statement.schema(),
340            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342            LogicalPlan::Explain(explain) => &explain.schema,
343            LogicalPlan::Analyze(analyze) => &analyze.schema,
344            LogicalPlan::Extension(extension) => extension.node.schema(),
345            LogicalPlan::Union(Union { schema, .. }) => schema,
346            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347                output_schema
348            }
349            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351            LogicalPlan::Ddl(ddl) => ddl.schema(),
352            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354                // we take the schema of the static term as the schema of the entire recursive query
355                static_term.schema()
356            }
357        }
358    }
359
360    /// Used for normalizing columns, as the fallback schemas to the main schema
361    /// of the plan.
362    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363        match self {
364            LogicalPlan::Window(_)
365            | LogicalPlan::Projection(_)
366            | LogicalPlan::Aggregate(_)
367            | LogicalPlan::Unnest(_)
368            | LogicalPlan::Join(_) => self
369                .inputs()
370                .iter()
371                .map(|input| input.schema().as_ref())
372                .collect(),
373            _ => vec![],
374        }
375    }
376
377    /// Returns the (fixed) output schema for explain plans
378    pub fn explain_schema() -> SchemaRef {
379        SchemaRef::new(Schema::new(vec![
380            Field::new("plan_type", DataType::Utf8, false),
381            Field::new("plan", DataType::Utf8, false),
382        ]))
383    }
384
385    /// Returns the (fixed) output schema for `DESCRIBE` plans
386    pub fn describe_schema() -> Schema {
387        Schema::new(vec![
388            Field::new("column_name", DataType::Utf8, false),
389            Field::new("data_type", DataType::Utf8, false),
390            Field::new("is_nullable", DataType::Utf8, false),
391        ])
392    }
393
394    /// Returns all expressions (non-recursively) evaluated by the current
395    /// logical plan node. This does not include expressions in any children.
396    ///
397    /// Note this method `clone`s all the expressions. When possible, the
398    /// [`tree_node`] API should be used instead of this API.
399    ///
400    /// The returned expressions do not necessarily represent or even
401    /// contributed to the output schema of this node. For example,
402    /// `LogicalPlan::Filter` returns the filter expression even though the
403    /// output of a Filter has the same columns as the input.
404    ///
405    /// The expressions do contain all the columns that are used by this plan,
406    /// so if there are columns not referenced by these expressions then
407    /// DataFusion's optimizer attempts to optimize them away.
408    ///
409    /// [`tree_node`]: crate::logical_plan::tree_node
410    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411        let mut exprs = vec![];
412        self.apply_expressions(|e| {
413            exprs.push(e.clone());
414            Ok(TreeNodeRecursion::Continue)
415        })
416        // closure always returns OK
417        .unwrap();
418        exprs
419    }
420
421    /// Returns all the out reference(correlated) expressions (recursively) in the current
422    /// logical plan nodes and all its descendant nodes.
423    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424        let mut exprs = vec![];
425        self.apply_expressions(|e| {
426            find_out_reference_exprs(e).into_iter().for_each(|e| {
427                if !exprs.contains(&e) {
428                    exprs.push(e)
429                }
430            });
431            Ok(TreeNodeRecursion::Continue)
432        })
433        // closure always returns OK
434        .unwrap();
435        self.inputs()
436            .into_iter()
437            .flat_map(|child| child.all_out_ref_exprs())
438            .for_each(|e| {
439                if !exprs.contains(&e) {
440                    exprs.push(e)
441                }
442            });
443        exprs
444    }
445
446    /// Returns all inputs / children of this `LogicalPlan` node.
447    ///
448    /// Note does not include inputs to inputs, or subqueries.
449    pub fn inputs(&self) -> Vec<&LogicalPlan> {
450        match self {
451            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454            LogicalPlan::Window(Window { input, .. }) => vec![input],
455            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461            LogicalPlan::Extension(extension) => extension.node.inputs(),
462            LogicalPlan::Union(Union { inputs, .. }) => {
463                inputs.iter().map(|arc| arc.as_ref()).collect()
464            }
465            LogicalPlan::Distinct(
466                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467            ) => vec![input],
468            LogicalPlan::Explain(explain) => vec![&explain.plan],
469            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470            LogicalPlan::Dml(write) => vec![&write.input],
471            LogicalPlan::Copy(copy) => vec![&copy.input],
472            LogicalPlan::Ddl(ddl) => ddl.inputs(),
473            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474            LogicalPlan::RecursiveQuery(RecursiveQuery {
475                static_term,
476                recursive_term,
477                ..
478            }) => vec![static_term, recursive_term],
479            LogicalPlan::Statement(stmt) => stmt.inputs(),
480            // plans without inputs
481            LogicalPlan::TableScan { .. }
482            | LogicalPlan::EmptyRelation { .. }
483            | LogicalPlan::Values { .. }
484            | LogicalPlan::DescribeTable(_) => vec![],
485        }
486    }
487
488    /// returns all `Using` join columns in a logical plan
489    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490        let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492        self.apply_with_subqueries(|plan| {
493            if let LogicalPlan::Join(Join {
494                join_constraint: JoinConstraint::Using,
495                on,
496                ..
497            }) = plan
498            {
499                // The join keys in using-join must be columns.
500                let columns =
501                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502                        let Some(l) = l.get_as_join_column() else {
503                            return internal_err!(
504                                "Invalid join key. Expected column, found {l:?}"
505                            );
506                        };
507                        let Some(r) = r.get_as_join_column() else {
508                            return internal_err!(
509                                "Invalid join key. Expected column, found {r:?}"
510                            );
511                        };
512                        accumu.insert(l.to_owned());
513                        accumu.insert(r.to_owned());
514                        Result::<_, DataFusionError>::Ok(accumu)
515                    })?;
516                using_columns.push(columns);
517            }
518            Ok(TreeNodeRecursion::Continue)
519        })?;
520
521        Ok(using_columns)
522    }
523
524    /// returns the first output expression of this `LogicalPlan` node.
525    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526        match self {
527            LogicalPlan::Projection(projection) => {
528                Ok(Some(projection.expr.as_slice()[0].clone()))
529            }
530            LogicalPlan::Aggregate(agg) => {
531                if agg.group_expr.is_empty() {
532                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533                } else {
534                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
535                }
536            }
537            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538                Ok(Some(select_expr[0].clone()))
539            }
540            LogicalPlan::Filter(Filter { input, .. })
541            | LogicalPlan::Distinct(Distinct::All(input))
542            | LogicalPlan::Sort(Sort { input, .. })
543            | LogicalPlan::Limit(Limit { input, .. })
544            | LogicalPlan::Repartition(Repartition { input, .. })
545            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546            LogicalPlan::Join(Join {
547                left,
548                right,
549                join_type,
550                ..
551            }) => match join_type {
552                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553                    if left.schema().fields().is_empty() {
554                        right.head_output_expr()
555                    } else {
556                        left.head_output_expr()
557                    }
558                }
559                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560                    left.head_output_expr()
561                }
562                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563                    right.head_output_expr()
564                }
565            },
566            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567                static_term.head_output_expr()
568            }
569            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570                union.schema.qualified_field(0),
571            )))),
572            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573                table.projected_schema.qualified_field(0),
574            )))),
575            LogicalPlan::SubqueryAlias(subquery_alias) => {
576                let expr_opt = subquery_alias.input.head_output_expr()?;
577                expr_opt
578                    .map(|expr| {
579                        Ok(Expr::Column(create_col_from_scalar_expr(
580                            &expr,
581                            subquery_alias.alias.to_string(),
582                        )?))
583                    })
584                    .map_or(Ok(None), |v| v.map(Some))
585            }
586            LogicalPlan::Subquery(_) => Ok(None),
587            LogicalPlan::EmptyRelation(_)
588            | LogicalPlan::Statement(_)
589            | LogicalPlan::Values(_)
590            | LogicalPlan::Explain(_)
591            | LogicalPlan::Analyze(_)
592            | LogicalPlan::Extension(_)
593            | LogicalPlan::Dml(_)
594            | LogicalPlan::Copy(_)
595            | LogicalPlan::Ddl(_)
596            | LogicalPlan::DescribeTable(_)
597            | LogicalPlan::Unnest(_) => Ok(None),
598        }
599    }
600
601    /// Recomputes schema and type information for this LogicalPlan if needed.
602    ///
603    /// Some `LogicalPlan`s may need to recompute their schema if the number or
604    /// type of expressions have been changed (for example due to type
605    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
606    /// its expressions.
607    ///
608    /// Some `LogicalPlan`s schema is unaffected by any changes to their
609    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
610    /// same as its input schema.
611    ///
612    /// This is useful after modifying a plans `Expr`s (or input plans) via
613    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
614    /// [Self::with_new_exprs], this method does not require a new set of
615    /// expressions or inputs plans.
616    ///
617    /// # Return value
618    /// Returns an error if there is some issue recomputing the schema.
619    ///
620    /// # Notes
621    ///
622    /// * Does not recursively recompute schema for input (child) plans.
623    pub fn recompute_schema(self) -> Result<Self> {
624        match self {
625            // Since expr may be different than the previous expr, schema of the projection
626            // may change. We need to use try_new method instead of try_new_with_schema method.
627            LogicalPlan::Projection(Projection {
628                expr,
629                input,
630                schema: _,
631            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632            LogicalPlan::Dml(_) => Ok(self),
633            LogicalPlan::Copy(_) => Ok(self),
634            LogicalPlan::Values(Values { schema, values }) => {
635                // todo it isn't clear why the schema is not recomputed here
636                Ok(LogicalPlan::Values(Values { schema, values }))
637            }
638            LogicalPlan::Filter(Filter { predicate, input }) => {
639                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640            }
641            LogicalPlan::Repartition(_) => Ok(self),
642            LogicalPlan::Window(Window {
643                input,
644                window_expr,
645                schema: _,
646            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647            LogicalPlan::Aggregate(Aggregate {
648                input,
649                group_expr,
650                aggr_expr,
651                schema: _,
652            }) => Aggregate::try_new(input, group_expr, aggr_expr)
653                .map(LogicalPlan::Aggregate),
654            LogicalPlan::Sort(_) => Ok(self),
655            LogicalPlan::Join(Join {
656                left,
657                right,
658                filter,
659                join_type,
660                join_constraint,
661                on,
662                schema: _,
663                null_equality,
664            }) => {
665                let schema =
666                    build_join_schema(left.schema(), right.schema(), &join_type)?;
667
668                let new_on: Vec<_> = on
669                    .into_iter()
670                    .map(|equi_expr| {
671                        // SimplifyExpression rule may add alias to the equi_expr.
672                        (equi_expr.0.unalias(), equi_expr.1.unalias())
673                    })
674                    .collect();
675
676                Ok(LogicalPlan::Join(Join {
677                    left,
678                    right,
679                    join_type,
680                    join_constraint,
681                    on: new_on,
682                    filter,
683                    schema: DFSchemaRef::new(schema),
684                    null_equality,
685                }))
686            }
687            LogicalPlan::Subquery(_) => Ok(self),
688            LogicalPlan::SubqueryAlias(SubqueryAlias {
689                input,
690                alias,
691                schema: _,
692            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
693            LogicalPlan::Limit(_) => Ok(self),
694            LogicalPlan::Ddl(_) => Ok(self),
695            LogicalPlan::Extension(Extension { node }) => {
696                // todo make an API that does not require cloning
697                // This requires a copy of the extension nodes expressions and inputs
698                let expr = node.expressions();
699                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
700                Ok(LogicalPlan::Extension(Extension {
701                    node: node.with_exprs_and_inputs(expr, inputs)?,
702                }))
703            }
704            LogicalPlan::Union(Union { inputs, schema }) => {
705                let first_input_schema = inputs[0].schema();
706                if schema.fields().len() == first_input_schema.fields().len() {
707                    // If inputs are not pruned do not change schema
708                    Ok(LogicalPlan::Union(Union { inputs, schema }))
709                } else {
710                    // A note on `Union`s constructed via `try_new_by_name`:
711                    //
712                    // At this point, the schema for each input should have
713                    // the same width. Thus, we do not need to save whether a
714                    // `Union` was created `BY NAME`, and can safely rely on the
715                    // `try_new` initializer to derive the new schema based on
716                    // column positions.
717                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
718                }
719            }
720            LogicalPlan::Distinct(distinct) => {
721                let distinct = match distinct {
722                    Distinct::All(input) => Distinct::All(input),
723                    Distinct::On(DistinctOn {
724                        on_expr,
725                        select_expr,
726                        sort_expr,
727                        input,
728                        schema: _,
729                    }) => Distinct::On(DistinctOn::try_new(
730                        on_expr,
731                        select_expr,
732                        sort_expr,
733                        input,
734                    )?),
735                };
736                Ok(LogicalPlan::Distinct(distinct))
737            }
738            LogicalPlan::RecursiveQuery(_) => Ok(self),
739            LogicalPlan::Analyze(_) => Ok(self),
740            LogicalPlan::Explain(_) => Ok(self),
741            LogicalPlan::TableScan(_) => Ok(self),
742            LogicalPlan::EmptyRelation(_) => Ok(self),
743            LogicalPlan::Statement(_) => Ok(self),
744            LogicalPlan::DescribeTable(_) => Ok(self),
745            LogicalPlan::Unnest(Unnest {
746                input,
747                exec_columns,
748                options,
749                ..
750            }) => {
751                // Update schema with unnested column type.
752                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
753            }
754        }
755    }
756
757    /// Returns a new `LogicalPlan` based on `self` with inputs and
758    /// expressions replaced.
759    ///
760    /// Note this method creates an entirely new node, which requires a large
761    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
762    /// instead of this API.
763    ///
764    /// The exprs correspond to the same order of expressions returned
765    /// by [`Self::expressions`]. This function is used by optimizers
766    /// to rewrite plans using the following pattern:
767    ///
768    /// [`tree_node`]: crate::logical_plan::tree_node
769    ///
770    /// ```text
771    /// let new_inputs = optimize_children(..., plan, props);
772    ///
773    /// // get the plans expressions to optimize
774    /// let exprs = plan.expressions();
775    ///
776    /// // potentially rewrite plan expressions
777    /// let rewritten_exprs = rewrite_exprs(exprs);
778    ///
779    /// // create new plan using rewritten_exprs in same position
780    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
781    /// ```
782    pub fn with_new_exprs(
783        &self,
784        mut expr: Vec<Expr>,
785        inputs: Vec<LogicalPlan>,
786    ) -> Result<LogicalPlan> {
787        match self {
788            // Since expr may be different than the previous expr, schema of the projection
789            // may change. We need to use try_new method instead of try_new_with_schema method.
790            LogicalPlan::Projection(Projection { .. }) => {
791                let input = self.only_input(inputs)?;
792                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
793            }
794            LogicalPlan::Dml(DmlStatement {
795                table_name,
796                target,
797                op,
798                ..
799            }) => {
800                self.assert_no_expressions(expr)?;
801                let input = self.only_input(inputs)?;
802                Ok(LogicalPlan::Dml(DmlStatement::new(
803                    table_name.clone(),
804                    Arc::clone(target),
805                    op.clone(),
806                    Arc::new(input),
807                )))
808            }
809            LogicalPlan::Copy(CopyTo {
810                input: _,
811                output_url,
812                file_type,
813                options,
814                partition_by,
815                output_schema: _,
816            }) => {
817                self.assert_no_expressions(expr)?;
818                let input = self.only_input(inputs)?;
819                Ok(LogicalPlan::Copy(CopyTo::new(
820                    Arc::new(input),
821                    output_url.clone(),
822                    partition_by.clone(),
823                    Arc::clone(file_type),
824                    options.clone(),
825                )))
826            }
827            LogicalPlan::Values(Values { schema, .. }) => {
828                self.assert_no_inputs(inputs)?;
829                Ok(LogicalPlan::Values(Values {
830                    schema: Arc::clone(schema),
831                    values: expr
832                        .chunks_exact(schema.fields().len())
833                        .map(|s| s.to_vec())
834                        .collect(),
835                }))
836            }
837            LogicalPlan::Filter { .. } => {
838                let predicate = self.only_expr(expr)?;
839                let input = self.only_input(inputs)?;
840
841                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
842            }
843            LogicalPlan::Repartition(Repartition {
844                partitioning_scheme,
845                ..
846            }) => match partitioning_scheme {
847                Partitioning::RoundRobinBatch(n) => {
848                    self.assert_no_expressions(expr)?;
849                    let input = self.only_input(inputs)?;
850                    Ok(LogicalPlan::Repartition(Repartition {
851                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
852                        input: Arc::new(input),
853                    }))
854                }
855                Partitioning::Hash(_, n) => {
856                    let input = self.only_input(inputs)?;
857                    Ok(LogicalPlan::Repartition(Repartition {
858                        partitioning_scheme: Partitioning::Hash(expr, *n),
859                        input: Arc::new(input),
860                    }))
861                }
862                Partitioning::DistributeBy(_) => {
863                    let input = self.only_input(inputs)?;
864                    Ok(LogicalPlan::Repartition(Repartition {
865                        partitioning_scheme: Partitioning::DistributeBy(expr),
866                        input: Arc::new(input),
867                    }))
868                }
869            },
870            LogicalPlan::Window(Window { window_expr, .. }) => {
871                assert_eq!(window_expr.len(), expr.len());
872                let input = self.only_input(inputs)?;
873                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
874            }
875            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
876                let input = self.only_input(inputs)?;
877                // group exprs are the first expressions
878                let agg_expr = expr.split_off(group_expr.len());
879
880                Aggregate::try_new(Arc::new(input), expr, agg_expr)
881                    .map(LogicalPlan::Aggregate)
882            }
883            LogicalPlan::Sort(Sort {
884                expr: sort_expr,
885                fetch,
886                ..
887            }) => {
888                let input = self.only_input(inputs)?;
889                Ok(LogicalPlan::Sort(Sort {
890                    expr: expr
891                        .into_iter()
892                        .zip(sort_expr.iter())
893                        .map(|(expr, sort)| sort.with_expr(expr))
894                        .collect(),
895                    input: Arc::new(input),
896                    fetch: *fetch,
897                }))
898            }
899            LogicalPlan::Join(Join {
900                join_type,
901                join_constraint,
902                on,
903                null_equality,
904                ..
905            }) => {
906                let (left, right) = self.only_two_inputs(inputs)?;
907                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
908
909                let equi_expr_count = on.len() * 2;
910                assert!(expr.len() >= equi_expr_count);
911
912                // Assume that the last expr, if any,
913                // is the filter_expr (non equality predicate from ON clause)
914                let filter_expr = if expr.len() > equi_expr_count {
915                    expr.pop()
916                } else {
917                    None
918                };
919
920                // The first part of expr is equi-exprs,
921                // and the struct of each equi-expr is like `left-expr = right-expr`.
922                assert_eq!(expr.len(), equi_expr_count);
923                let mut new_on = Vec::with_capacity(on.len());
924                let mut iter = expr.into_iter();
925                while let Some(left) = iter.next() {
926                    let Some(right) = iter.next() else {
927                        internal_err!(
928                            "Expected a pair of expressions to construct the join on expression"
929                        )?
930                    };
931
932                    // SimplifyExpression rule may add alias to the equi_expr.
933                    new_on.push((left.unalias(), right.unalias()));
934                }
935
936                Ok(LogicalPlan::Join(Join {
937                    left: Arc::new(left),
938                    right: Arc::new(right),
939                    join_type: *join_type,
940                    join_constraint: *join_constraint,
941                    on: new_on,
942                    filter: filter_expr,
943                    schema: DFSchemaRef::new(schema),
944                    null_equality: *null_equality,
945                }))
946            }
947            LogicalPlan::Subquery(Subquery {
948                outer_ref_columns,
949                spans,
950                ..
951            }) => {
952                self.assert_no_expressions(expr)?;
953                let input = self.only_input(inputs)?;
954                let subquery = LogicalPlanBuilder::from(input).build()?;
955                Ok(LogicalPlan::Subquery(Subquery {
956                    subquery: Arc::new(subquery),
957                    outer_ref_columns: outer_ref_columns.clone(),
958                    spans: spans.clone(),
959                }))
960            }
961            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
962                self.assert_no_expressions(expr)?;
963                let input = self.only_input(inputs)?;
964                SubqueryAlias::try_new(Arc::new(input), alias.clone())
965                    .map(LogicalPlan::SubqueryAlias)
966            }
967            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
968                let old_expr_len = skip.iter().chain(fetch.iter()).count();
969                assert_eq_or_internal_err!(
970                    old_expr_len,
971                    expr.len(),
972                    "Invalid number of new Limit expressions: expected {}, got {}",
973                    old_expr_len,
974                    expr.len()
975                );
976                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
977                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
978                let new_skip = skip.as_ref().and_then(|_| expr.pop());
979                let input = self.only_input(inputs)?;
980                Ok(LogicalPlan::Limit(Limit {
981                    skip: new_skip.map(Box::new),
982                    fetch: new_fetch.map(Box::new),
983                    input: Arc::new(input),
984                }))
985            }
986            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
987                name,
988                if_not_exists,
989                or_replace,
990                column_defaults,
991                temporary,
992                ..
993            })) => {
994                self.assert_no_expressions(expr)?;
995                let input = self.only_input(inputs)?;
996                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
997                    CreateMemoryTable {
998                        input: Arc::new(input),
999                        constraints: Constraints::default(),
1000                        name: name.clone(),
1001                        if_not_exists: *if_not_exists,
1002                        or_replace: *or_replace,
1003                        column_defaults: column_defaults.clone(),
1004                        temporary: *temporary,
1005                    },
1006                )))
1007            }
1008            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1009                name,
1010                or_replace,
1011                definition,
1012                temporary,
1013                ..
1014            })) => {
1015                self.assert_no_expressions(expr)?;
1016                let input = self.only_input(inputs)?;
1017                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1018                    input: Arc::new(input),
1019                    name: name.clone(),
1020                    or_replace: *or_replace,
1021                    temporary: *temporary,
1022                    definition: definition.clone(),
1023                })))
1024            }
1025            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1026                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1027            })),
1028            LogicalPlan::Union(Union { schema, .. }) => {
1029                self.assert_no_expressions(expr)?;
1030                let input_schema = inputs[0].schema();
1031                // If inputs are not pruned do not change schema.
1032                let schema = if schema.fields().len() == input_schema.fields().len() {
1033                    Arc::clone(schema)
1034                } else {
1035                    Arc::clone(input_schema)
1036                };
1037                Ok(LogicalPlan::Union(Union {
1038                    inputs: inputs.into_iter().map(Arc::new).collect(),
1039                    schema,
1040                }))
1041            }
1042            LogicalPlan::Distinct(distinct) => {
1043                let distinct = match distinct {
1044                    Distinct::All(_) => {
1045                        self.assert_no_expressions(expr)?;
1046                        let input = self.only_input(inputs)?;
1047                        Distinct::All(Arc::new(input))
1048                    }
1049                    Distinct::On(DistinctOn {
1050                        on_expr,
1051                        select_expr,
1052                        ..
1053                    }) => {
1054                        let input = self.only_input(inputs)?;
1055                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1056                        let select_expr = expr.split_off(on_expr.len());
1057                        assert!(
1058                            sort_expr.is_empty(),
1059                            "with_new_exprs for Distinct does not support sort expressions"
1060                        );
1061                        Distinct::On(DistinctOn::try_new(
1062                            expr,
1063                            select_expr,
1064                            None, // no sort expressions accepted
1065                            Arc::new(input),
1066                        )?)
1067                    }
1068                };
1069                Ok(LogicalPlan::Distinct(distinct))
1070            }
1071            LogicalPlan::RecursiveQuery(RecursiveQuery {
1072                name, is_distinct, ..
1073            }) => {
1074                self.assert_no_expressions(expr)?;
1075                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1076                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1077                    name: name.clone(),
1078                    static_term: Arc::new(static_term),
1079                    recursive_term: Arc::new(recursive_term),
1080                    is_distinct: *is_distinct,
1081                }))
1082            }
1083            LogicalPlan::Analyze(a) => {
1084                self.assert_no_expressions(expr)?;
1085                let input = self.only_input(inputs)?;
1086                Ok(LogicalPlan::Analyze(Analyze {
1087                    verbose: a.verbose,
1088                    schema: Arc::clone(&a.schema),
1089                    input: Arc::new(input),
1090                }))
1091            }
1092            LogicalPlan::Explain(e) => {
1093                self.assert_no_expressions(expr)?;
1094                let input = self.only_input(inputs)?;
1095                Ok(LogicalPlan::Explain(Explain {
1096                    verbose: e.verbose,
1097                    plan: Arc::new(input),
1098                    explain_format: e.explain_format.clone(),
1099                    stringified_plans: e.stringified_plans.clone(),
1100                    schema: Arc::clone(&e.schema),
1101                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1102                }))
1103            }
1104            LogicalPlan::Statement(Statement::Prepare(Prepare {
1105                name, fields, ..
1106            })) => {
1107                self.assert_no_expressions(expr)?;
1108                let input = self.only_input(inputs)?;
1109                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1110                    name: name.clone(),
1111                    fields: fields.clone(),
1112                    input: Arc::new(input),
1113                })))
1114            }
1115            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1116                self.assert_no_inputs(inputs)?;
1117                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1118                    name: name.clone(),
1119                    parameters: expr,
1120                })))
1121            }
1122            LogicalPlan::TableScan(ts) => {
1123                self.assert_no_inputs(inputs)?;
1124                Ok(LogicalPlan::TableScan(TableScan {
1125                    filters: expr,
1126                    ..ts.clone()
1127                }))
1128            }
1129            LogicalPlan::EmptyRelation(_)
1130            | LogicalPlan::Ddl(_)
1131            | LogicalPlan::Statement(_)
1132            | LogicalPlan::DescribeTable(_) => {
1133                // All of these plan types have no inputs / exprs so should not be called
1134                self.assert_no_expressions(expr)?;
1135                self.assert_no_inputs(inputs)?;
1136                Ok(self.clone())
1137            }
1138            LogicalPlan::Unnest(Unnest {
1139                exec_columns: columns,
1140                options,
1141                ..
1142            }) => {
1143                self.assert_no_expressions(expr)?;
1144                let input = self.only_input(inputs)?;
1145                // Update schema with unnested column type.
1146                let new_plan =
1147                    unnest_with_options(input, columns.clone(), options.clone())?;
1148                Ok(new_plan)
1149            }
1150        }
1151    }
1152
1153    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1154    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1155        match check {
1156            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1157            InvariantLevel::Executable => assert_executable_invariants(self),
1158        }
1159    }
1160
1161    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1162    #[inline]
1163    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1164    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1165        assert_or_internal_err!(
1166            expr.is_empty(),
1167            "{self:?} should have no exprs, got {:?}",
1168            expr
1169        );
1170        Ok(())
1171    }
1172
1173    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1174    #[inline]
1175    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1176    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1177        assert_or_internal_err!(
1178            inputs.is_empty(),
1179            "{self:?} should have no inputs, got: {:?}",
1180            inputs
1181        );
1182        Ok(())
1183    }
1184
1185    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1186    #[inline]
1187    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1188        assert_eq_or_internal_err!(
1189            expr.len(),
1190            1,
1191            "{self:?} should have exactly one expr, got {:?}",
1192            &expr
1193        );
1194        Ok(expr.remove(0))
1195    }
1196
1197    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1198    #[inline]
1199    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1200        assert_eq_or_internal_err!(
1201            inputs.len(),
1202            1,
1203            "{self:?} should have exactly one input, got {:?}",
1204            &inputs
1205        );
1206        Ok(inputs.remove(0))
1207    }
1208
1209    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1210    #[inline]
1211    fn only_two_inputs(
1212        &self,
1213        mut inputs: Vec<LogicalPlan>,
1214    ) -> Result<(LogicalPlan, LogicalPlan)> {
1215        assert_eq_or_internal_err!(
1216            inputs.len(),
1217            2,
1218            "{self:?} should have exactly two inputs, got {:?}",
1219            &inputs
1220        );
1221        let right = inputs.remove(1);
1222        let left = inputs.remove(0);
1223        Ok((left, right))
1224    }
1225
1226    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1227    /// with the specified `param_values`.
1228    ///
1229    /// [`Prepare`] statements are converted to
1230    /// their inner logical plan for execution.
1231    ///
1232    /// # Example
1233    /// ```
1234    /// # use arrow::datatypes::{Field, Schema, DataType};
1235    /// use datafusion_common::ScalarValue;
1236    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1237    /// # let schema = Schema::new(vec![
1238    /// #     Field::new("id", DataType::Int32, false),
1239    /// # ]);
1240    /// // Build SELECT * FROM t1 WHERE id = $1
1241    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1242    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1243    ///     .build().unwrap();
1244    ///
1245    /// assert_eq!(
1246    ///   "Filter: t1.id = $1\
1247    ///   \n  TableScan: t1",
1248    ///   plan.display_indent().to_string()
1249    /// );
1250    ///
1251    /// // Fill in the parameter $1 with a literal 3
1252    /// let plan = plan.with_param_values(vec![
1253    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1254    /// ]).unwrap();
1255    ///
1256    /// assert_eq!(
1257    ///    "Filter: t1.id = Int32(3)\
1258    ///    \n  TableScan: t1",
1259    ///    plan.display_indent().to_string()
1260    ///  );
1261    ///
1262    /// // Note you can also used named parameters
1263    /// // Build SELECT * FROM t1 WHERE id = $my_param
1264    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1265    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1266    ///     .build().unwrap()
1267    ///     // Fill in the parameter $my_param with a literal 3
1268    ///     .with_param_values(vec![
1269    ///       ("my_param", ScalarValue::from(3i32)),
1270    ///     ]).unwrap();
1271    ///
1272    /// assert_eq!(
1273    ///    "Filter: t1.id = Int32(3)\
1274    ///    \n  TableScan: t1",
1275    ///    plan.display_indent().to_string()
1276    ///  );
1277    /// ```
1278    pub fn with_param_values(
1279        self,
1280        param_values: impl Into<ParamValues>,
1281    ) -> Result<LogicalPlan> {
1282        let param_values = param_values.into();
1283        let plan_with_values = self.replace_params_with_values(&param_values)?;
1284
1285        // unwrap Prepare
1286        Ok(
1287            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1288                plan_with_values
1289            {
1290                param_values.verify_fields(&prepare_lp.fields)?;
1291                // try and take ownership of the input if is not shared, clone otherwise
1292                Arc::unwrap_or_clone(prepare_lp.input)
1293            } else {
1294                plan_with_values
1295            },
1296        )
1297    }
1298
1299    /// Returns the maximum number of rows that this plan can output, if known.
1300    ///
1301    /// If `None`, the plan can return any number of rows.
1302    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1303    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1304        match self {
1305            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1306            LogicalPlan::Filter(filter) => {
1307                if filter.is_scalar() {
1308                    Some(1)
1309                } else {
1310                    filter.input.max_rows()
1311                }
1312            }
1313            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1314            LogicalPlan::Aggregate(Aggregate {
1315                input, group_expr, ..
1316            }) => {
1317                // Empty group_expr will return Some(1)
1318                if group_expr
1319                    .iter()
1320                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1321                {
1322                    Some(1)
1323                } else {
1324                    input.max_rows()
1325                }
1326            }
1327            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1328                match (fetch, input.max_rows()) {
1329                    (Some(fetch_limit), Some(input_max)) => {
1330                        Some(input_max.min(*fetch_limit))
1331                    }
1332                    (Some(fetch_limit), None) => Some(*fetch_limit),
1333                    (None, Some(input_max)) => Some(input_max),
1334                    (None, None) => None,
1335                }
1336            }
1337            LogicalPlan::Join(Join {
1338                left,
1339                right,
1340                join_type,
1341                ..
1342            }) => match join_type {
1343                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1344                JoinType::Left | JoinType::Right | JoinType::Full => {
1345                    match (left.max_rows()?, right.max_rows()?, join_type) {
1346                        (0, 0, _) => Some(0),
1347                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1348                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1349                        (left_max, right_max, _) => Some(left_max * right_max),
1350                    }
1351                }
1352                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1353                    left.max_rows()
1354                }
1355                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1356                    right.max_rows()
1357                }
1358            },
1359            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1360            LogicalPlan::Union(Union { inputs, .. }) => {
1361                inputs.iter().try_fold(0usize, |mut acc, plan| {
1362                    acc += plan.max_rows()?;
1363                    Some(acc)
1364                })
1365            }
1366            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1367            LogicalPlan::EmptyRelation(_) => Some(0),
1368            LogicalPlan::RecursiveQuery(_) => None,
1369            LogicalPlan::Subquery(_) => None,
1370            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1371            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1372                Ok(FetchType::Literal(s)) => s,
1373                _ => None,
1374            },
1375            LogicalPlan::Distinct(
1376                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1377            ) => input.max_rows(),
1378            LogicalPlan::Values(v) => Some(v.values.len()),
1379            LogicalPlan::Unnest(_) => None,
1380            LogicalPlan::Ddl(_)
1381            | LogicalPlan::Explain(_)
1382            | LogicalPlan::Analyze(_)
1383            | LogicalPlan::Dml(_)
1384            | LogicalPlan::Copy(_)
1385            | LogicalPlan::DescribeTable(_)
1386            | LogicalPlan::Statement(_)
1387            | LogicalPlan::Extension(_) => None,
1388        }
1389    }
1390
1391    /// If this node's expressions contains any references to an outer subquery
1392    pub fn contains_outer_reference(&self) -> bool {
1393        let mut contains = false;
1394        self.apply_expressions(|expr| {
1395            Ok(if expr.contains_outer() {
1396                contains = true;
1397                TreeNodeRecursion::Stop
1398            } else {
1399                TreeNodeRecursion::Continue
1400            })
1401        })
1402        .unwrap();
1403        contains
1404    }
1405
1406    /// Get the output expressions and their corresponding columns.
1407    ///
1408    /// The parent node may reference the output columns of the plan by expressions, such as
1409    /// projection over aggregate or window functions. This method helps to convert the
1410    /// referenced expressions into columns.
1411    ///
1412    /// See also: [`crate::utils::columnize_expr`]
1413    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1414        match self {
1415            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1416                .output_expressions()?
1417                .into_iter()
1418                .zip(self.schema().columns())
1419                .collect()),
1420            LogicalPlan::Window(Window {
1421                window_expr,
1422                input,
1423                schema,
1424            }) => {
1425                // The input could be another Window, so the result should also include the input's. For Example:
1426                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1427                // Its plan is:
1428                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1429                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1430                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1431                //       TableScan: t projection=[a, b]
1432                let mut output_exprs = input.columnized_output_exprs()?;
1433                let input_len = input.schema().fields().len();
1434                output_exprs.extend(
1435                    window_expr
1436                        .iter()
1437                        .zip(schema.columns().into_iter().skip(input_len)),
1438                );
1439                Ok(output_exprs)
1440            }
1441            _ => Ok(vec![]),
1442        }
1443    }
1444}
1445
1446impl LogicalPlan {
1447    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1448    /// ...) replaced with corresponding values provided in
1449    /// `params_values`
1450    ///
1451    /// See [`Self::with_param_values`] for examples and usage with an owned
1452    /// `ParamValues`
1453    pub fn replace_params_with_values(
1454        self,
1455        param_values: &ParamValues,
1456    ) -> Result<LogicalPlan> {
1457        self.transform_up_with_subqueries(|plan| {
1458            let schema = Arc::clone(plan.schema());
1459            let name_preserver = NamePreserver::new(&plan);
1460            plan.map_expressions(|e| {
1461                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1462                if !has_placeholder {
1463                    // Performance optimization:
1464                    // avoid NamePreserver copy and second pass over expression
1465                    // if no placeholders.
1466                    Ok(Transformed::no(e))
1467                } else {
1468                    let original_name = name_preserver.save(&e);
1469                    let transformed_expr = e.transform_up(|e| {
1470                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1471                            let (value, metadata) = param_values
1472                                .get_placeholders_with_values(&id)?
1473                                .into_inner();
1474                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1475                        } else {
1476                            Ok(Transformed::no(e))
1477                        }
1478                    })?;
1479                    // Preserve name to avoid breaking column references to this expression
1480                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1481                }
1482            })?
1483            .map_data(|plan| plan.update_schema_data_type())
1484        })
1485        .map(|res| res.data)
1486    }
1487
1488    /// Recompute schema fields' data type after replacing params, ensuring fields data type can be
1489    /// updated according to the new parameters.
1490    ///
1491    /// Unlike `recompute_schema()`, this method rebuilds VALUES plans entirely to properly infer
1492    /// types types from literal values after placeholder substitution.
1493    fn update_schema_data_type(self) -> Result<LogicalPlan> {
1494        match self {
1495            // Build `LogicalPlan::Values` from the values for type inference.
1496            // We can't use `recompute_schema` because it skips recomputing for
1497            // `LogicalPlan::Values`.
1498            LogicalPlan::Values(Values { values, schema: _ }) => {
1499                LogicalPlanBuilder::values(values)?.build()
1500            }
1501            // other plans can just use `recompute_schema` directly.
1502            plan => plan.recompute_schema(),
1503        }
1504    }
1505
1506    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1507    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1508        let mut param_names = HashSet::new();
1509        self.apply_with_subqueries(|plan| {
1510            plan.apply_expressions(|expr| {
1511                expr.apply(|expr| {
1512                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1513                        param_names.insert(id.clone());
1514                    }
1515                    Ok(TreeNodeRecursion::Continue)
1516                })
1517            })
1518        })
1519        .map(|_| param_names)
1520    }
1521
1522    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1523    ///
1524    /// Note that this will drop any extension or field metadata attached to parameters. Use
1525    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1526    pub fn get_parameter_types(
1527        &self,
1528    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1529        let mut parameter_fields = self.get_parameter_fields()?;
1530        Ok(parameter_fields
1531            .drain()
1532            .map(|(name, maybe_field)| {
1533                (name, maybe_field.map(|field| field.data_type().clone()))
1534            })
1535            .collect())
1536    }
1537
1538    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1539    pub fn get_parameter_fields(
1540        &self,
1541    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1542        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1543
1544        self.apply_with_subqueries(|plan| {
1545            plan.apply_expressions(|expr| {
1546                expr.apply(|expr| {
1547                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1548                        let prev = param_types.get(id);
1549                        match (prev, field) {
1550                            (Some(Some(prev)), Some(field)) => {
1551                                check_metadata_with_storage_equal(
1552                                    (field.data_type(), Some(field.metadata())),
1553                                    (prev.data_type(), Some(prev.metadata())),
1554                                    "parameter",
1555                                    &format!(": Conflicting types for id {id}"),
1556                                )?;
1557                            }
1558                            (_, Some(field)) => {
1559                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1560                            }
1561                            _ => {
1562                                param_types.insert(id.clone(), None);
1563                            }
1564                        }
1565                    }
1566                    Ok(TreeNodeRecursion::Continue)
1567                })
1568            })
1569        })
1570        .map(|_| param_types)
1571    }
1572
1573    // ------------
1574    // Various implementations for printing out LogicalPlans
1575    // ------------
1576
1577    /// Return a `format`able structure that produces a single line
1578    /// per node.
1579    ///
1580    /// # Example
1581    ///
1582    /// ```text
1583    /// Projection: employee.id
1584    ///    Filter: employee.state Eq Utf8(\"CO\")\
1585    ///       CsvScan: employee projection=Some([0, 3])
1586    /// ```
1587    ///
1588    /// ```
1589    /// use arrow::datatypes::{DataType, Field, Schema};
1590    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1591    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1592    /// let plan = table_scan(Some("t1"), &schema, None)
1593    ///     .unwrap()
1594    ///     .filter(col("id").eq(lit(5)))
1595    ///     .unwrap()
1596    ///     .build()
1597    ///     .unwrap();
1598    ///
1599    /// // Format using display_indent
1600    /// let display_string = format!("{}", plan.display_indent());
1601    ///
1602    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1603    /// ```
1604    pub fn display_indent(&self) -> impl Display + '_ {
1605        // Boilerplate structure to wrap LogicalPlan with something
1606        // that that can be formatted
1607        struct Wrapper<'a>(&'a LogicalPlan);
1608        impl Display for Wrapper<'_> {
1609            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1610                let with_schema = false;
1611                let mut visitor = IndentVisitor::new(f, with_schema);
1612                match self.0.visit_with_subqueries(&mut visitor) {
1613                    Ok(_) => Ok(()),
1614                    Err(_) => Err(fmt::Error),
1615                }
1616            }
1617        }
1618        Wrapper(self)
1619    }
1620
1621    /// Return a `format`able structure that produces a single line
1622    /// per node that includes the output schema. For example:
1623    ///
1624    /// ```text
1625    /// Projection: employee.id [id:Int32]\
1626    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1627    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1628    /// ```
1629    ///
1630    /// ```
1631    /// use arrow::datatypes::{DataType, Field, Schema};
1632    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1633    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1634    /// let plan = table_scan(Some("t1"), &schema, None)
1635    ///     .unwrap()
1636    ///     .filter(col("id").eq(lit(5)))
1637    ///     .unwrap()
1638    ///     .build()
1639    ///     .unwrap();
1640    ///
1641    /// // Format using display_indent_schema
1642    /// let display_string = format!("{}", plan.display_indent_schema());
1643    ///
1644    /// assert_eq!(
1645    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1646    ///             \n  TableScan: t1 [id:Int32]",
1647    ///     display_string
1648    /// );
1649    /// ```
1650    pub fn display_indent_schema(&self) -> impl Display + '_ {
1651        // Boilerplate structure to wrap LogicalPlan with something
1652        // that that can be formatted
1653        struct Wrapper<'a>(&'a LogicalPlan);
1654        impl Display for Wrapper<'_> {
1655            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1656                let with_schema = true;
1657                let mut visitor = IndentVisitor::new(f, with_schema);
1658                match self.0.visit_with_subqueries(&mut visitor) {
1659                    Ok(_) => Ok(()),
1660                    Err(_) => Err(fmt::Error),
1661                }
1662            }
1663        }
1664        Wrapper(self)
1665    }
1666
1667    /// Return a displayable structure that produces plan in postgresql JSON format.
1668    ///
1669    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1670    pub fn display_pg_json(&self) -> impl Display + '_ {
1671        // Boilerplate structure to wrap LogicalPlan with something
1672        // that that can be formatted
1673        struct Wrapper<'a>(&'a LogicalPlan);
1674        impl Display for Wrapper<'_> {
1675            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1676                let mut visitor = PgJsonVisitor::new(f);
1677                visitor.with_schema(true);
1678                match self.0.visit_with_subqueries(&mut visitor) {
1679                    Ok(_) => Ok(()),
1680                    Err(_) => Err(fmt::Error),
1681                }
1682            }
1683        }
1684        Wrapper(self)
1685    }
1686
1687    /// Return a `format`able structure that produces lines meant for
1688    /// graphical display using the `DOT` language. This format can be
1689    /// visualized using software from
1690    /// [`graphviz`](https://graphviz.org/)
1691    ///
1692    /// This currently produces two graphs -- one with the basic
1693    /// structure, and one with additional details such as schema.
1694    ///
1695    /// ```
1696    /// use arrow::datatypes::{DataType, Field, Schema};
1697    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1698    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1699    /// let plan = table_scan(Some("t1"), &schema, None)
1700    ///     .unwrap()
1701    ///     .filter(col("id").eq(lit(5)))
1702    ///     .unwrap()
1703    ///     .build()
1704    ///     .unwrap();
1705    ///
1706    /// // Format using display_graphviz
1707    /// let graphviz_string = format!("{}", plan.display_graphviz());
1708    /// ```
1709    ///
1710    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1711    /// commands can be used to render it as a pdf:
1712    ///
1713    /// ```bash
1714    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1715    /// ```
1716    pub fn display_graphviz(&self) -> impl Display + '_ {
1717        // Boilerplate structure to wrap LogicalPlan with something
1718        // that that can be formatted
1719        struct Wrapper<'a>(&'a LogicalPlan);
1720        impl Display for Wrapper<'_> {
1721            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1722                let mut visitor = GraphvizVisitor::new(f);
1723
1724                visitor.start_graph()?;
1725
1726                visitor.pre_visit_plan("LogicalPlan")?;
1727                self.0
1728                    .visit_with_subqueries(&mut visitor)
1729                    .map_err(|_| fmt::Error)?;
1730                visitor.post_visit_plan()?;
1731
1732                visitor.set_with_schema(true);
1733                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1734                self.0
1735                    .visit_with_subqueries(&mut visitor)
1736                    .map_err(|_| fmt::Error)?;
1737                visitor.post_visit_plan()?;
1738
1739                visitor.end_graph()?;
1740                Ok(())
1741            }
1742        }
1743        Wrapper(self)
1744    }
1745
1746    /// Return a `format`able structure with the a human readable
1747    /// description of this LogicalPlan node per node, not including
1748    /// children. For example:
1749    ///
1750    /// ```text
1751    /// Projection: id
1752    /// ```
1753    /// ```
1754    /// use arrow::datatypes::{DataType, Field, Schema};
1755    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1756    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1757    /// let plan = table_scan(Some("t1"), &schema, None)
1758    ///     .unwrap()
1759    ///     .build()
1760    ///     .unwrap();
1761    ///
1762    /// // Format using display
1763    /// let display_string = format!("{}", plan.display());
1764    ///
1765    /// assert_eq!("TableScan: t1", display_string);
1766    /// ```
1767    pub fn display(&self) -> impl Display + '_ {
1768        // Boilerplate structure to wrap LogicalPlan with something
1769        // that that can be formatted
1770        struct Wrapper<'a>(&'a LogicalPlan);
1771        impl Display for Wrapper<'_> {
1772            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1773                match self.0 {
1774                    LogicalPlan::EmptyRelation(EmptyRelation {
1775                        produce_one_row,
1776                        schema: _,
1777                    }) => {
1778                        let rows = if *produce_one_row { 1 } else { 0 };
1779                        write!(f, "EmptyRelation: rows={rows}")
1780                    }
1781                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1782                        is_distinct, ..
1783                    }) => {
1784                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1785                    }
1786                    LogicalPlan::Values(Values { values, .. }) => {
1787                        let str_values: Vec<_> = values
1788                            .iter()
1789                            // limit to only 5 values to avoid horrible display
1790                            .take(5)
1791                            .map(|row| {
1792                                let item = row
1793                                    .iter()
1794                                    .map(|expr| expr.to_string())
1795                                    .collect::<Vec<_>>()
1796                                    .join(", ");
1797                                format!("({item})")
1798                            })
1799                            .collect();
1800
1801                        let eclipse = if values.len() > 5 { "..." } else { "" };
1802                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1803                    }
1804
1805                    LogicalPlan::TableScan(TableScan {
1806                        source,
1807                        table_name,
1808                        projection,
1809                        filters,
1810                        fetch,
1811                        ..
1812                    }) => {
1813                        let projected_fields = match projection {
1814                            Some(indices) => {
1815                                let schema = source.schema();
1816                                let names: Vec<&str> = indices
1817                                    .iter()
1818                                    .map(|i| schema.field(*i).name().as_str())
1819                                    .collect();
1820                                format!(" projection=[{}]", names.join(", "))
1821                            }
1822                            _ => "".to_string(),
1823                        };
1824
1825                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1826
1827                        if !filters.is_empty() {
1828                            let mut full_filter = vec![];
1829                            let mut partial_filter = vec![];
1830                            let mut unsupported_filters = vec![];
1831                            let filters: Vec<&Expr> = filters.iter().collect();
1832
1833                            if let Ok(results) =
1834                                source.supports_filters_pushdown(&filters)
1835                            {
1836                                filters.iter().zip(results.iter()).for_each(
1837                                    |(x, res)| match res {
1838                                        TableProviderFilterPushDown::Exact => {
1839                                            full_filter.push(x)
1840                                        }
1841                                        TableProviderFilterPushDown::Inexact => {
1842                                            partial_filter.push(x)
1843                                        }
1844                                        TableProviderFilterPushDown::Unsupported => {
1845                                            unsupported_filters.push(x)
1846                                        }
1847                                    },
1848                                );
1849                            }
1850
1851                            if !full_filter.is_empty() {
1852                                write!(
1853                                    f,
1854                                    ", full_filters=[{}]",
1855                                    expr_vec_fmt!(full_filter)
1856                                )?;
1857                            };
1858                            if !partial_filter.is_empty() {
1859                                write!(
1860                                    f,
1861                                    ", partial_filters=[{}]",
1862                                    expr_vec_fmt!(partial_filter)
1863                                )?;
1864                            }
1865                            if !unsupported_filters.is_empty() {
1866                                write!(
1867                                    f,
1868                                    ", unsupported_filters=[{}]",
1869                                    expr_vec_fmt!(unsupported_filters)
1870                                )?;
1871                            }
1872                        }
1873
1874                        if let Some(n) = fetch {
1875                            write!(f, ", fetch={n}")?;
1876                        }
1877
1878                        Ok(())
1879                    }
1880                    LogicalPlan::Projection(Projection { expr, .. }) => {
1881                        write!(f, "Projection:")?;
1882                        for (i, expr_item) in expr.iter().enumerate() {
1883                            if i > 0 {
1884                                write!(f, ",")?;
1885                            }
1886                            write!(f, " {expr_item}")?;
1887                        }
1888                        Ok(())
1889                    }
1890                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1891                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1892                    }
1893                    LogicalPlan::Copy(CopyTo {
1894                        input: _,
1895                        output_url,
1896                        file_type,
1897                        options,
1898                        ..
1899                    }) => {
1900                        let op_str = options
1901                            .iter()
1902                            .map(|(k, v)| format!("{k} {v}"))
1903                            .collect::<Vec<String>>()
1904                            .join(", ");
1905
1906                        write!(
1907                            f,
1908                            "CopyTo: format={} output_url={output_url} options: ({op_str})",
1909                            file_type.get_ext()
1910                        )
1911                    }
1912                    LogicalPlan::Ddl(ddl) => {
1913                        write!(f, "{}", ddl.display())
1914                    }
1915                    LogicalPlan::Filter(Filter {
1916                        predicate: expr, ..
1917                    }) => write!(f, "Filter: {expr}"),
1918                    LogicalPlan::Window(Window { window_expr, .. }) => {
1919                        write!(
1920                            f,
1921                            "WindowAggr: windowExpr=[[{}]]",
1922                            expr_vec_fmt!(window_expr)
1923                        )
1924                    }
1925                    LogicalPlan::Aggregate(Aggregate {
1926                        group_expr,
1927                        aggr_expr,
1928                        ..
1929                    }) => write!(
1930                        f,
1931                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1932                        expr_vec_fmt!(group_expr),
1933                        expr_vec_fmt!(aggr_expr)
1934                    ),
1935                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1936                        write!(f, "Sort: ")?;
1937                        for (i, expr_item) in expr.iter().enumerate() {
1938                            if i > 0 {
1939                                write!(f, ", ")?;
1940                            }
1941                            write!(f, "{expr_item}")?;
1942                        }
1943                        if let Some(a) = fetch {
1944                            write!(f, ", fetch={a}")?;
1945                        }
1946
1947                        Ok(())
1948                    }
1949                    LogicalPlan::Join(Join {
1950                        on: keys,
1951                        filter,
1952                        join_constraint,
1953                        join_type,
1954                        ..
1955                    }) => {
1956                        let join_expr: Vec<String> =
1957                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1958                        let filter_expr = filter
1959                            .as_ref()
1960                            .map(|expr| format!(" Filter: {expr}"))
1961                            .unwrap_or_else(|| "".to_string());
1962                        let join_type = if filter.is_none()
1963                            && keys.is_empty()
1964                            && matches!(join_type, JoinType::Inner)
1965                        {
1966                            "Cross".to_string()
1967                        } else {
1968                            join_type.to_string()
1969                        };
1970                        match join_constraint {
1971                            JoinConstraint::On => {
1972                                write!(
1973                                    f,
1974                                    "{} Join: {}{}",
1975                                    join_type,
1976                                    join_expr.join(", "),
1977                                    filter_expr
1978                                )
1979                            }
1980                            JoinConstraint::Using => {
1981                                write!(
1982                                    f,
1983                                    "{} Join: Using {}{}",
1984                                    join_type,
1985                                    join_expr.join(", "),
1986                                    filter_expr,
1987                                )
1988                            }
1989                        }
1990                    }
1991                    LogicalPlan::Repartition(Repartition {
1992                        partitioning_scheme,
1993                        ..
1994                    }) => match partitioning_scheme {
1995                        Partitioning::RoundRobinBatch(n) => {
1996                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1997                        }
1998                        Partitioning::Hash(expr, n) => {
1999                            let hash_expr: Vec<String> =
2000                                expr.iter().map(|e| format!("{e}")).collect();
2001                            write!(
2002                                f,
2003                                "Repartition: Hash({}) partition_count={}",
2004                                hash_expr.join(", "),
2005                                n
2006                            )
2007                        }
2008                        Partitioning::DistributeBy(expr) => {
2009                            let dist_by_expr: Vec<String> =
2010                                expr.iter().map(|e| format!("{e}")).collect();
2011                            write!(
2012                                f,
2013                                "Repartition: DistributeBy({})",
2014                                dist_by_expr.join(", "),
2015                            )
2016                        }
2017                    },
2018                    LogicalPlan::Limit(limit) => {
2019                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
2020                        let skip_str = match limit.get_skip_type() {
2021                            Ok(SkipType::Literal(n)) => n.to_string(),
2022                            _ => limit
2023                                .skip
2024                                .as_ref()
2025                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2026                        };
2027                        let fetch_str = match limit.get_fetch_type() {
2028                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
2029                            Ok(FetchType::Literal(None)) => "None".to_string(),
2030                            _ => limit
2031                                .fetch
2032                                .as_ref()
2033                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2034                        };
2035                        write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2036                    }
2037                    LogicalPlan::Subquery(Subquery { .. }) => {
2038                        write!(f, "Subquery:")
2039                    }
2040                    LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2041                        write!(f, "SubqueryAlias: {alias}")
2042                    }
2043                    LogicalPlan::Statement(statement) => {
2044                        write!(f, "{}", statement.display())
2045                    }
2046                    LogicalPlan::Distinct(distinct) => match distinct {
2047                        Distinct::All(_) => write!(f, "Distinct:"),
2048                        Distinct::On(DistinctOn {
2049                            on_expr,
2050                            select_expr,
2051                            sort_expr,
2052                            ..
2053                        }) => write!(
2054                            f,
2055                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2056                            expr_vec_fmt!(on_expr),
2057                            expr_vec_fmt!(select_expr),
2058                            if let Some(sort_expr) = sort_expr {
2059                                expr_vec_fmt!(sort_expr)
2060                            } else {
2061                                "".to_string()
2062                            },
2063                        ),
2064                    },
2065                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2066                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2067                    LogicalPlan::Union(_) => write!(f, "Union"),
2068                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2069                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2070                        write!(f, "DescribeTable")
2071                    }
2072                    LogicalPlan::Unnest(Unnest {
2073                        input: plan,
2074                        list_type_columns: list_col_indices,
2075                        struct_type_columns: struct_col_indices,
2076                        ..
2077                    }) => {
2078                        let input_columns = plan.schema().columns();
2079                        let list_type_columns = list_col_indices
2080                            .iter()
2081                            .map(|(i, unnest_info)| {
2082                                format!(
2083                                    "{}|depth={}",
2084                                    &input_columns[*i].to_string(),
2085                                    unnest_info.depth
2086                                )
2087                            })
2088                            .collect::<Vec<String>>();
2089                        let struct_type_columns = struct_col_indices
2090                            .iter()
2091                            .map(|i| &input_columns[*i])
2092                            .collect::<Vec<&Column>>();
2093                        // get items from input_columns indexed by list_col_indices
2094                        write!(
2095                            f,
2096                            "Unnest: lists[{}] structs[{}]",
2097                            expr_vec_fmt!(list_type_columns),
2098                            expr_vec_fmt!(struct_type_columns)
2099                        )
2100                    }
2101                }
2102            }
2103        }
2104        Wrapper(self)
2105    }
2106}
2107
2108impl Display for LogicalPlan {
2109    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2110        self.display_indent().fmt(f)
2111    }
2112}
2113
2114impl ToStringifiedPlan for LogicalPlan {
2115    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2116        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2117    }
2118}
2119
2120/// Relationship produces 0 or 1 placeholder rows with specified output schema
2121/// In most cases the output schema for `EmptyRelation` would be empty,
2122/// however, it can be non-empty typically for optimizer rules
2123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2124pub struct EmptyRelation {
2125    /// Whether to produce a placeholder row
2126    pub produce_one_row: bool,
2127    /// The schema description of the output
2128    pub schema: DFSchemaRef,
2129}
2130
2131// Manual implementation needed because of `schema` field. Comparison excludes this field.
2132impl PartialOrd for EmptyRelation {
2133    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2134        self.produce_one_row
2135            .partial_cmp(&other.produce_one_row)
2136            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2137            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2138    }
2139}
2140
2141/// A variadic query operation, Recursive CTE.
2142///
2143/// # Recursive Query Evaluation
2144///
2145/// From the [Postgres Docs]:
2146///
2147/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2148///    discard duplicate rows. Include all remaining rows in the result of the
2149///    recursive query, and also place them in a temporary working table.
2150///
2151/// 2. So long as the working table is not empty, repeat these steps:
2152///
2153/// * Evaluate the recursive term, substituting the current contents of the
2154///   working table for the recursive self-reference. For `UNION` (but not `UNION
2155///   ALL`), discard duplicate rows and rows that duplicate any previous result
2156///   row. Include all remaining rows in the result of the recursive query, and
2157///   also place them in a temporary intermediate table.
2158///
2159/// * Replace the contents of the working table with the contents of the
2160///   intermediate table, then empty the intermediate table.
2161///
2162/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2163#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2164pub struct RecursiveQuery {
2165    /// Name of the query
2166    pub name: String,
2167    /// The static term (initial contents of the working table)
2168    pub static_term: Arc<LogicalPlan>,
2169    /// The recursive term (evaluated on the contents of the working table until
2170    /// it returns an empty set)
2171    pub recursive_term: Arc<LogicalPlan>,
2172    /// Should the output of the recursive term be deduplicated (`UNION`) or
2173    /// not (`UNION ALL`).
2174    pub is_distinct: bool,
2175}
2176
2177/// Values expression. See
2178/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2179/// documentation for more details.
2180#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2181pub struct Values {
2182    /// The table schema
2183    pub schema: DFSchemaRef,
2184    /// Values
2185    pub values: Vec<Vec<Expr>>,
2186}
2187
2188// Manual implementation needed because of `schema` field. Comparison excludes this field.
2189impl PartialOrd for Values {
2190    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2191        self.values
2192            .partial_cmp(&other.values)
2193            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2194            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2195    }
2196}
2197
2198/// Evaluates an arbitrary list of expressions (essentially a
2199/// SELECT with an expression list) on its input.
2200#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2201// mark non_exhaustive to encourage use of try_new/new()
2202#[non_exhaustive]
2203pub struct Projection {
2204    /// The list of expressions
2205    pub expr: Vec<Expr>,
2206    /// The incoming logical plan
2207    pub input: Arc<LogicalPlan>,
2208    /// The schema description of the output
2209    pub schema: DFSchemaRef,
2210}
2211
2212// Manual implementation needed because of `schema` field. Comparison excludes this field.
2213impl PartialOrd for Projection {
2214    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2215        match self.expr.partial_cmp(&other.expr) {
2216            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2217            cmp => cmp,
2218        }
2219        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2220        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2221    }
2222}
2223
2224impl Projection {
2225    /// Create a new Projection
2226    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2227        let projection_schema = projection_schema(&input, &expr)?;
2228        Self::try_new_with_schema(expr, input, projection_schema)
2229    }
2230
2231    /// Create a new Projection using the specified output schema
2232    pub fn try_new_with_schema(
2233        expr: Vec<Expr>,
2234        input: Arc<LogicalPlan>,
2235        schema: DFSchemaRef,
2236    ) -> Result<Self> {
2237        #[expect(deprecated)]
2238        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2239            && expr.len() != schema.fields().len()
2240        {
2241            return plan_err!(
2242                "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2243                expr.len(),
2244                schema.fields().len()
2245            );
2246        }
2247        Ok(Self {
2248            expr,
2249            input,
2250            schema,
2251        })
2252    }
2253
2254    /// Create a new Projection using the specified output schema
2255    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2256        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2257        Self {
2258            expr,
2259            input,
2260            schema,
2261        }
2262    }
2263}
2264
2265/// Computes the schema of the result produced by applying a projection to the input logical plan.
2266///
2267/// # Arguments
2268///
2269/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2270///   will be computed.
2271/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2272///
2273/// # Metadata Handling
2274///
2275/// - **Schema-level metadata**: Passed through unchanged from the input schema
2276/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2277///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2278///
2279/// # Returns
2280///
2281/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2282/// produced by the projection operation. If the schema computation is successful,
2283/// the `Result` will contain the schema; otherwise, it will contain an error.
2284pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2285    // Preserve input schema metadata at the schema level
2286    let metadata = input.schema().metadata().clone();
2287
2288    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2289    let schema =
2290        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2291            .with_functional_dependencies(calc_func_dependencies_for_project(
2292                exprs, input,
2293            )?)?;
2294
2295    Ok(Arc::new(schema))
2296}
2297
2298/// Aliased subquery
2299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2300// mark non_exhaustive to encourage use of try_new/new()
2301#[non_exhaustive]
2302pub struct SubqueryAlias {
2303    /// The incoming logical plan
2304    pub input: Arc<LogicalPlan>,
2305    /// The alias for the input relation
2306    pub alias: TableReference,
2307    /// The schema with qualified field names
2308    pub schema: DFSchemaRef,
2309}
2310
2311impl SubqueryAlias {
2312    pub fn try_new(
2313        plan: Arc<LogicalPlan>,
2314        alias: impl Into<TableReference>,
2315    ) -> Result<Self> {
2316        let alias = alias.into();
2317
2318        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2319        // no field must share the same column name as this would lead to ambiguity when referencing
2320        // columns in parent logical nodes.
2321
2322        // Compute unique aliases, if any, for each column of the input's schema.
2323        let aliases = unique_field_aliases(plan.schema().fields());
2324        let is_projection_needed = aliases.iter().any(Option::is_some);
2325
2326        // Insert a projection node, if needed, to make sure aliases are applied.
2327        let plan = if is_projection_needed {
2328            let projection_expressions = aliases
2329                .iter()
2330                .zip(plan.schema().iter())
2331                .map(|(alias, (qualifier, field))| {
2332                    let column =
2333                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2334                    match alias {
2335                        None => column,
2336                        Some(alias) => {
2337                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2338                        }
2339                    }
2340                })
2341                .collect();
2342            let projection = Projection::try_new(projection_expressions, plan)?;
2343            Arc::new(LogicalPlan::Projection(projection))
2344        } else {
2345            plan
2346        };
2347
2348        // Requalify fields with the new `alias`.
2349        let fields = plan.schema().fields().clone();
2350        let meta_data = plan.schema().metadata().clone();
2351        let func_dependencies = plan.schema().functional_dependencies().clone();
2352
2353        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2354        let schema = schema.as_arrow();
2355
2356        let schema = DFSchemaRef::new(
2357            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2358                .with_functional_dependencies(func_dependencies)?,
2359        );
2360        Ok(SubqueryAlias {
2361            input: plan,
2362            alias,
2363            schema,
2364        })
2365    }
2366}
2367
2368// Manual implementation needed because of `schema` field. Comparison excludes this field.
2369impl PartialOrd for SubqueryAlias {
2370    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2371        match self.input.partial_cmp(&other.input) {
2372            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2373            cmp => cmp,
2374        }
2375        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2376        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2377    }
2378}
2379
2380/// Filters rows from its input that do not match an
2381/// expression (essentially a WHERE clause with a predicate
2382/// expression).
2383///
2384/// Semantically, `<predicate>` is evaluated for each row of the input;
2385/// If the value of `<predicate>` is true, the input row is passed to
2386/// the output. If the value of `<predicate>` is false, the row is
2387/// discarded.
2388///
2389/// Filter should not be created directly but instead use `try_new()`
2390/// and that these fields are only pub to support pattern matching
2391#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2392#[non_exhaustive]
2393pub struct Filter {
2394    /// The predicate expression, which must have Boolean type.
2395    pub predicate: Expr,
2396    /// The incoming logical plan
2397    pub input: Arc<LogicalPlan>,
2398}
2399
2400impl Filter {
2401    /// Create a new filter operator.
2402    ///
2403    /// Notes: as Aliases have no effect on the output of a filter operator,
2404    /// they are removed from the predicate expression.
2405    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2406        Self::try_new_internal(predicate, input)
2407    }
2408
2409    /// Create a new filter operator for a having clause.
2410    /// This is similar to a filter, but its having flag is set to true.
2411    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2412    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2413        Self::try_new_internal(predicate, input)
2414    }
2415
2416    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2417        match data_type {
2418            // Interpret NULL as a missing boolean value.
2419            DataType::Boolean | DataType::Null => true,
2420            DataType::Dictionary(_, value_type) => {
2421                Filter::is_allowed_filter_type(value_type.as_ref())
2422            }
2423            _ => false,
2424        }
2425    }
2426
2427    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2428        // Filter predicates must return a boolean value so we try and validate that here.
2429        // Note that it is not always possible to resolve the predicate expression during plan
2430        // construction (such as with correlated subqueries) so we make a best effort here and
2431        // ignore errors resolving the expression against the schema.
2432        if let Ok(predicate_type) = predicate.get_type(input.schema())
2433            && !Filter::is_allowed_filter_type(&predicate_type)
2434        {
2435            return plan_err!(
2436                "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2437            );
2438        }
2439
2440        Ok(Self {
2441            predicate: predicate.unalias_nested().data,
2442            input,
2443        })
2444    }
2445
2446    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2447    ///
2448    /// This function will return `true` if its predicate contains a conjunction of
2449    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2450    /// by this conjunction.
2451    ///
2452    /// For example, for the table:
2453    /// ```sql
2454    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2455    /// ```
2456    /// `Filter(a = 2).is_scalar() == true`
2457    /// , whereas
2458    /// `Filter(b = 2).is_scalar() == false`
2459    /// and
2460    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2461    fn is_scalar(&self) -> bool {
2462        let schema = self.input.schema();
2463
2464        let functional_dependencies = self.input.schema().functional_dependencies();
2465        let unique_keys = functional_dependencies.iter().filter(|dep| {
2466            let nullable = dep.nullable
2467                && dep
2468                    .source_indices
2469                    .iter()
2470                    .any(|&source| schema.field(source).is_nullable());
2471            !nullable
2472                && dep.mode == Dependency::Single
2473                && dep.target_indices.len() == schema.fields().len()
2474        });
2475
2476        let exprs = split_conjunction(&self.predicate);
2477        let eq_pred_cols: HashSet<_> = exprs
2478            .iter()
2479            .filter_map(|expr| {
2480                let Expr::BinaryExpr(BinaryExpr {
2481                    left,
2482                    op: Operator::Eq,
2483                    right,
2484                }) = expr
2485                else {
2486                    return None;
2487                };
2488                // This is a no-op filter expression
2489                if left == right {
2490                    return None;
2491                }
2492
2493                match (left.as_ref(), right.as_ref()) {
2494                    (Expr::Column(_), Expr::Column(_)) => None,
2495                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2496                        Some(schema.index_of_column(c).unwrap())
2497                    }
2498                    _ => None,
2499                }
2500            })
2501            .collect();
2502
2503        // If we have a functional dependence that is a subset of our predicate,
2504        // this filter is scalar
2505        for key in unique_keys {
2506            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2507                return true;
2508            }
2509        }
2510        false
2511    }
2512}
2513
2514/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2515///
2516/// # Output Schema
2517///
2518/// The output schema is the input schema followed by the window function
2519/// expressions, in order.
2520///
2521/// For example, given the input schema `"A", "B", "C"` and the window function
2522/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2523/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2524/// output column.
2525///
2526/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2527/// schema.
2528#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2529pub struct Window {
2530    /// The incoming logical plan
2531    pub input: Arc<LogicalPlan>,
2532    /// The window function expression
2533    pub window_expr: Vec<Expr>,
2534    /// The schema description of the window output
2535    pub schema: DFSchemaRef,
2536}
2537
2538impl Window {
2539    /// Create a new window operator.
2540    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2541        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2542            .schema()
2543            .iter()
2544            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2545            .collect();
2546        let input_len = fields.len();
2547        let mut window_fields = fields;
2548        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2549        window_fields.extend_from_slice(expr_fields.as_slice());
2550        let metadata = input.schema().metadata().clone();
2551
2552        // Update functional dependencies for window:
2553        let mut window_func_dependencies =
2554            input.schema().functional_dependencies().clone();
2555        window_func_dependencies.extend_target_indices(window_fields.len());
2556
2557        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2558        // of consecutive numbers per partition), we can represent this fact with
2559        // functional dependencies.
2560        let mut new_dependencies = window_expr
2561            .iter()
2562            .enumerate()
2563            .filter_map(|(idx, expr)| {
2564                let Expr::WindowFunction(window_fun) = expr else {
2565                    return None;
2566                };
2567                let WindowFunction {
2568                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2569                    params: WindowFunctionParams { partition_by, .. },
2570                } = window_fun.as_ref()
2571                else {
2572                    return None;
2573                };
2574                // When there is no PARTITION BY, row number will be unique
2575                // across the entire table.
2576                if udwf.name() == "row_number" && partition_by.is_empty() {
2577                    Some(idx + input_len)
2578                } else {
2579                    None
2580                }
2581            })
2582            .map(|idx| {
2583                FunctionalDependence::new(vec![idx], vec![], false)
2584                    .with_mode(Dependency::Single)
2585            })
2586            .collect::<Vec<_>>();
2587
2588        if !new_dependencies.is_empty() {
2589            for dependence in new_dependencies.iter_mut() {
2590                dependence.target_indices = (0..window_fields.len()).collect();
2591            }
2592            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2593            let new_deps = FunctionalDependencies::new(new_dependencies);
2594            window_func_dependencies.extend(new_deps);
2595        }
2596
2597        // Validate that FILTER clauses are only used with aggregate window functions
2598        if let Some(e) = window_expr.iter().find(|e| {
2599            matches!(
2600                e,
2601                Expr::WindowFunction(wf)
2602                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2603                        && wf.params.filter.is_some()
2604            )
2605        }) {
2606            return plan_err!(
2607                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2608            );
2609        }
2610
2611        Self::try_new_with_schema(
2612            window_expr,
2613            input,
2614            Arc::new(
2615                DFSchema::new_with_metadata(window_fields, metadata)?
2616                    .with_functional_dependencies(window_func_dependencies)?,
2617            ),
2618        )
2619    }
2620
2621    /// Create a new window function using the provided schema to avoid the overhead of
2622    /// building the schema again when the schema is already known.
2623    ///
2624    /// This method should only be called when you are absolutely sure that the schema being
2625    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2626    pub fn try_new_with_schema(
2627        window_expr: Vec<Expr>,
2628        input: Arc<LogicalPlan>,
2629        schema: DFSchemaRef,
2630    ) -> Result<Self> {
2631        let input_fields_count = input.schema().fields().len();
2632        if schema.fields().len() != input_fields_count + window_expr.len() {
2633            return plan_err!(
2634                "Window schema has wrong number of fields. Expected {} got {}",
2635                input_fields_count + window_expr.len(),
2636                schema.fields().len()
2637            );
2638        }
2639
2640        Ok(Window {
2641            input,
2642            window_expr,
2643            schema,
2644        })
2645    }
2646}
2647
2648// Manual implementation needed because of `schema` field. Comparison excludes this field.
2649impl PartialOrd for Window {
2650    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2651        match self.input.partial_cmp(&other.input)? {
2652            Ordering::Equal => {} // continue
2653            not_equal => return Some(not_equal),
2654        }
2655
2656        match self.window_expr.partial_cmp(&other.window_expr)? {
2657            Ordering::Equal => {} // continue
2658            not_equal => return Some(not_equal),
2659        }
2660
2661        // Contract for PartialOrd and PartialEq consistency requires that
2662        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2663        if self == other {
2664            Some(Ordering::Equal)
2665        } else {
2666            None
2667        }
2668    }
2669}
2670
2671/// Produces rows from a table provider by reference or from the context
2672#[derive(Clone)]
2673pub struct TableScan {
2674    /// The name of the table
2675    pub table_name: TableReference,
2676    /// The source of the table
2677    pub source: Arc<dyn TableSource>,
2678    /// Optional column indices to use as a projection
2679    pub projection: Option<Vec<usize>>,
2680    /// The schema description of the output
2681    pub projected_schema: DFSchemaRef,
2682    /// Optional expressions to be used as filters by the table provider
2683    pub filters: Vec<Expr>,
2684    /// Optional number of rows to read
2685    pub fetch: Option<usize>,
2686}
2687
2688impl Debug for TableScan {
2689    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2690        f.debug_struct("TableScan")
2691            .field("table_name", &self.table_name)
2692            .field("source", &"...")
2693            .field("projection", &self.projection)
2694            .field("projected_schema", &self.projected_schema)
2695            .field("filters", &self.filters)
2696            .field("fetch", &self.fetch)
2697            .finish_non_exhaustive()
2698    }
2699}
2700
2701impl PartialEq for TableScan {
2702    fn eq(&self, other: &Self) -> bool {
2703        self.table_name == other.table_name
2704            && self.projection == other.projection
2705            && self.projected_schema == other.projected_schema
2706            && self.filters == other.filters
2707            && self.fetch == other.fetch
2708    }
2709}
2710
2711impl Eq for TableScan {}
2712
2713// Manual implementation needed because of `source` and `projected_schema` fields.
2714// Comparison excludes these field.
2715impl PartialOrd for TableScan {
2716    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2717        #[derive(PartialEq, PartialOrd)]
2718        struct ComparableTableScan<'a> {
2719            /// The name of the table
2720            pub table_name: &'a TableReference,
2721            /// Optional column indices to use as a projection
2722            pub projection: &'a Option<Vec<usize>>,
2723            /// Optional expressions to be used as filters by the table provider
2724            pub filters: &'a Vec<Expr>,
2725            /// Optional number of rows to read
2726            pub fetch: &'a Option<usize>,
2727        }
2728        let comparable_self = ComparableTableScan {
2729            table_name: &self.table_name,
2730            projection: &self.projection,
2731            filters: &self.filters,
2732            fetch: &self.fetch,
2733        };
2734        let comparable_other = ComparableTableScan {
2735            table_name: &other.table_name,
2736            projection: &other.projection,
2737            filters: &other.filters,
2738            fetch: &other.fetch,
2739        };
2740        comparable_self
2741            .partial_cmp(&comparable_other)
2742            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2743            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2744    }
2745}
2746
2747impl Hash for TableScan {
2748    fn hash<H: Hasher>(&self, state: &mut H) {
2749        self.table_name.hash(state);
2750        self.projection.hash(state);
2751        self.projected_schema.hash(state);
2752        self.filters.hash(state);
2753        self.fetch.hash(state);
2754    }
2755}
2756
2757impl TableScan {
2758    /// Initialize TableScan with appropriate schema from the given
2759    /// arguments.
2760    pub fn try_new(
2761        table_name: impl Into<TableReference>,
2762        table_source: Arc<dyn TableSource>,
2763        projection: Option<Vec<usize>>,
2764        filters: Vec<Expr>,
2765        fetch: Option<usize>,
2766    ) -> Result<Self> {
2767        let table_name = table_name.into();
2768
2769        if table_name.table().is_empty() {
2770            return plan_err!("table_name cannot be empty");
2771        }
2772        let schema = table_source.schema();
2773        let func_dependencies = FunctionalDependencies::new_from_constraints(
2774            table_source.constraints(),
2775            schema.fields.len(),
2776        );
2777        let projected_schema = projection
2778            .as_ref()
2779            .map(|p| {
2780                let projected_func_dependencies =
2781                    func_dependencies.project_functional_dependencies(p, p.len());
2782
2783                let df_schema = DFSchema::new_with_metadata(
2784                    p.iter()
2785                        .map(|i| {
2786                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2787                        })
2788                        .collect(),
2789                    schema.metadata.clone(),
2790                )?;
2791                df_schema.with_functional_dependencies(projected_func_dependencies)
2792            })
2793            .unwrap_or_else(|| {
2794                let df_schema =
2795                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2796                df_schema.with_functional_dependencies(func_dependencies)
2797            })?;
2798        let projected_schema = Arc::new(projected_schema);
2799
2800        Ok(Self {
2801            table_name,
2802            source: table_source,
2803            projection,
2804            projected_schema,
2805            filters,
2806            fetch,
2807        })
2808    }
2809}
2810
2811// Repartition the plan based on a partitioning scheme.
2812#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2813pub struct Repartition {
2814    /// The incoming logical plan
2815    pub input: Arc<LogicalPlan>,
2816    /// The partitioning scheme
2817    pub partitioning_scheme: Partitioning,
2818}
2819
2820/// Union multiple inputs
2821#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2822pub struct Union {
2823    /// Inputs to merge
2824    pub inputs: Vec<Arc<LogicalPlan>>,
2825    /// Union schema. Should be the same for all inputs.
2826    pub schema: DFSchemaRef,
2827}
2828
2829impl Union {
2830    /// Constructs new Union instance deriving schema from inputs.
2831    /// Schema data types must match exactly.
2832    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2833        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2834        Ok(Union { inputs, schema })
2835    }
2836
2837    /// Constructs new Union instance deriving schema from inputs.
2838    /// Inputs do not have to have matching types and produced schema will
2839    /// take type from the first input.
2840    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2841    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2842        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2843        Ok(Union { inputs, schema })
2844    }
2845
2846    /// Constructs a new Union instance that combines rows from different tables by name,
2847    /// instead of by position. This means that the specified inputs need not have schemas
2848    /// that are all the same width.
2849    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2850        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2851        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2852
2853        Ok(Union { inputs, schema })
2854    }
2855
2856    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2857    /// in an additional `Projection` to account for absence of columns
2858    /// in input schemas or differing projection orders.
2859    fn rewrite_inputs_from_schema(
2860        schema: &Arc<DFSchema>,
2861        inputs: Vec<Arc<LogicalPlan>>,
2862    ) -> Result<Vec<Arc<LogicalPlan>>> {
2863        let schema_width = schema.iter().count();
2864        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2865        for input in inputs {
2866            // Any columns that exist within the derived schema but do not exist
2867            // within an input's schema should be replaced with `NULL` aliased
2868            // to the appropriate column in the derived schema.
2869            let mut expr = Vec::with_capacity(schema_width);
2870            for column in schema.columns() {
2871                if input
2872                    .schema()
2873                    .has_column_with_unqualified_name(column.name())
2874                {
2875                    expr.push(Expr::Column(column));
2876                } else {
2877                    expr.push(
2878                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2879                    );
2880                }
2881            }
2882            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2883                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2884            )));
2885        }
2886
2887        Ok(wrapped_inputs)
2888    }
2889
2890    /// Constructs new Union instance deriving schema from inputs.
2891    ///
2892    /// If `loose_types` is true, inputs do not need to have matching types and
2893    /// the produced schema will use the type from the first input.
2894    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2895    ///
2896    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2897    /// the constructed schema follows `UNION BY NAME` semantics.
2898    fn derive_schema_from_inputs(
2899        inputs: &[Arc<LogicalPlan>],
2900        loose_types: bool,
2901        by_name: bool,
2902    ) -> Result<DFSchemaRef> {
2903        if inputs.len() < 2 {
2904            return plan_err!("UNION requires at least two inputs");
2905        }
2906
2907        if by_name {
2908            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2909        } else {
2910            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2911        }
2912    }
2913
2914    fn derive_schema_from_inputs_by_name(
2915        inputs: &[Arc<LogicalPlan>],
2916        loose_types: bool,
2917    ) -> Result<DFSchemaRef> {
2918        type FieldData<'a> =
2919            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2920        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2921        for input in inputs.iter() {
2922            for field in input.schema().fields() {
2923                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2924                    cols.iter_mut().find(|(name, _)| name == field.name())
2925                {
2926                    if !loose_types && *data_type != field.data_type() {
2927                        return plan_err!(
2928                            "Found different types for field {}",
2929                            field.name()
2930                        );
2931                    }
2932
2933                    metadata.push(field.metadata());
2934                    // If the field is nullable in any one of the inputs,
2935                    // then the field in the final schema is also nullable.
2936                    *is_nullable |= field.is_nullable();
2937                    *occurrences += 1;
2938                } else {
2939                    cols.push((
2940                        field.name(),
2941                        (
2942                            field.data_type(),
2943                            field.is_nullable(),
2944                            vec![field.metadata()],
2945                            1,
2946                        ),
2947                    ));
2948                }
2949            }
2950        }
2951
2952        let union_fields = cols
2953            .into_iter()
2954            .map(
2955                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2956                    // If the final number of occurrences of the field is less
2957                    // than the number of inputs (i.e. the field is missing from
2958                    // one or more inputs), then it must be treated as nullable.
2959                    let final_is_nullable = if occurrences == inputs.len() {
2960                        is_nullable
2961                    } else {
2962                        true
2963                    };
2964
2965                    let mut field =
2966                        Field::new(name, data_type.clone(), final_is_nullable);
2967                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2968
2969                    (None, Arc::new(field))
2970                },
2971            )
2972            .collect::<Vec<(Option<TableReference>, _)>>();
2973
2974        let union_schema_metadata = intersect_metadata_for_union(
2975            inputs.iter().map(|input| input.schema().metadata()),
2976        );
2977
2978        // Functional Dependencies are not preserved after UNION operation
2979        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2980        let schema = Arc::new(schema);
2981
2982        Ok(schema)
2983    }
2984
2985    fn derive_schema_from_inputs_by_position(
2986        inputs: &[Arc<LogicalPlan>],
2987        loose_types: bool,
2988    ) -> Result<DFSchemaRef> {
2989        let first_schema = inputs[0].schema();
2990        let fields_count = first_schema.fields().len();
2991        for input in inputs.iter().skip(1) {
2992            if fields_count != input.schema().fields().len() {
2993                return plan_err!(
2994                    "UNION queries have different number of columns: \
2995                    left has {} columns whereas right has {} columns",
2996                    fields_count,
2997                    input.schema().fields().len()
2998                );
2999            }
3000        }
3001
3002        let mut name_counts: HashMap<String, usize> = HashMap::new();
3003        let union_fields = (0..fields_count)
3004            .map(|i| {
3005                let fields = inputs
3006                    .iter()
3007                    .map(|input| input.schema().field(i))
3008                    .collect::<Vec<_>>();
3009                let first_field = fields[0];
3010                let base_name = first_field.name().to_string();
3011
3012                let data_type = if loose_types {
3013                    // TODO apply type coercion here, or document why it's better to defer
3014                    // temporarily use the data type from the left input and later rely on the analyzer to
3015                    // coerce the two schemas into a common one.
3016                    first_field.data_type()
3017                } else {
3018                    fields.iter().skip(1).try_fold(
3019                        first_field.data_type(),
3020                        |acc, field| {
3021                            if acc != field.data_type() {
3022                                return plan_err!(
3023                                    "UNION field {i} have different type in inputs: \
3024                                    left has {} whereas right has {}",
3025                                    first_field.data_type(),
3026                                    field.data_type()
3027                                );
3028                            }
3029                            Ok(acc)
3030                        },
3031                    )?
3032                };
3033                let nullable = fields.iter().any(|field| field.is_nullable());
3034
3035                // Generate unique field name
3036                let name = if let Some(count) = name_counts.get_mut(&base_name) {
3037                    *count += 1;
3038                    format!("{base_name}_{count}")
3039                } else {
3040                    name_counts.insert(base_name.clone(), 0);
3041                    base_name
3042                };
3043
3044                let mut field = Field::new(&name, data_type.clone(), nullable);
3045                let field_metadata = intersect_metadata_for_union(
3046                    fields.iter().map(|field| field.metadata()),
3047                );
3048                field.set_metadata(field_metadata);
3049                Ok((None, Arc::new(field)))
3050            })
3051            .collect::<Result<_>>()?;
3052        let union_schema_metadata = intersect_metadata_for_union(
3053            inputs.iter().map(|input| input.schema().metadata()),
3054        );
3055
3056        // Functional Dependencies are not preserved after UNION operation
3057        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3058        let schema = Arc::new(schema);
3059
3060        Ok(schema)
3061    }
3062}
3063
3064// Manual implementation needed because of `schema` field. Comparison excludes this field.
3065impl PartialOrd for Union {
3066    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3067        self.inputs
3068            .partial_cmp(&other.inputs)
3069            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3070            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3071    }
3072}
3073
3074/// Describe the schema of table
3075///
3076/// # Example output:
3077///
3078/// ```sql
3079/// > describe traces;
3080/// +--------------------+-----------------------------+-------------+
3081/// | column_name        | data_type                   | is_nullable |
3082/// +--------------------+-----------------------------+-------------+
3083/// | attributes         | Utf8                        | YES         |
3084/// | duration_nano      | Int64                       | YES         |
3085/// | end_time_unix_nano | Int64                       | YES         |
3086/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3087/// | span.kind          | Utf8                        | YES         |
3088/// | span.name          | Utf8                        | YES         |
3089/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3090/// | time               | Timestamp(Nanosecond, None) | NO          |
3091/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3092/// | otel.status_code   | Utf8                        | YES         |
3093/// | parent_span_id     | Utf8                        | YES         |
3094/// +--------------------+-----------------------------+-------------+
3095/// ```
3096#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3097pub struct DescribeTable {
3098    /// Table schema
3099    pub schema: Arc<Schema>,
3100    /// schema of describe table output
3101    pub output_schema: DFSchemaRef,
3102}
3103
3104// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3105// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3106impl PartialOrd for DescribeTable {
3107    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3108        // There is no relevant comparison for schemas
3109        None
3110    }
3111}
3112
3113/// Options for EXPLAIN
3114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3115pub struct ExplainOption {
3116    /// Include detailed debug info
3117    pub verbose: bool,
3118    /// Actually execute the plan and report metrics
3119    pub analyze: bool,
3120    /// Output syntax/format
3121    pub format: ExplainFormat,
3122}
3123
3124impl Default for ExplainOption {
3125    fn default() -> Self {
3126        ExplainOption {
3127            verbose: false,
3128            analyze: false,
3129            format: ExplainFormat::Indent,
3130        }
3131    }
3132}
3133
3134impl ExplainOption {
3135    /// Builder‐style setter for `verbose`
3136    pub fn with_verbose(mut self, verbose: bool) -> Self {
3137        self.verbose = verbose;
3138        self
3139    }
3140
3141    /// Builder‐style setter for `analyze`
3142    pub fn with_analyze(mut self, analyze: bool) -> Self {
3143        self.analyze = analyze;
3144        self
3145    }
3146
3147    /// Builder‐style setter for `format`
3148    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3149        self.format = format;
3150        self
3151    }
3152}
3153
3154/// Produces a relation with string representations of
3155/// various parts of the plan
3156///
3157/// See [the documentation] for more information
3158///
3159/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3160#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3161pub struct Explain {
3162    /// Should extra (detailed, intermediate plans) be included?
3163    pub verbose: bool,
3164    /// Output format for explain, if specified.
3165    /// If none, defaults to `text`
3166    pub explain_format: ExplainFormat,
3167    /// The logical plan that is being EXPLAIN'd
3168    pub plan: Arc<LogicalPlan>,
3169    /// Represent the various stages plans have gone through
3170    pub stringified_plans: Vec<StringifiedPlan>,
3171    /// The output schema of the explain (2 columns of text)
3172    pub schema: DFSchemaRef,
3173    /// Used by physical planner to check if should proceed with planning
3174    pub logical_optimization_succeeded: bool,
3175}
3176
3177// Manual implementation needed because of `schema` field. Comparison excludes this field.
3178impl PartialOrd for Explain {
3179    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3180        #[derive(PartialEq, PartialOrd)]
3181        struct ComparableExplain<'a> {
3182            /// Should extra (detailed, intermediate plans) be included?
3183            pub verbose: &'a bool,
3184            /// The logical plan that is being EXPLAIN'd
3185            pub plan: &'a Arc<LogicalPlan>,
3186            /// Represent the various stages plans have gone through
3187            pub stringified_plans: &'a Vec<StringifiedPlan>,
3188            /// Used by physical planner to check if should proceed with planning
3189            pub logical_optimization_succeeded: &'a bool,
3190        }
3191        let comparable_self = ComparableExplain {
3192            verbose: &self.verbose,
3193            plan: &self.plan,
3194            stringified_plans: &self.stringified_plans,
3195            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3196        };
3197        let comparable_other = ComparableExplain {
3198            verbose: &other.verbose,
3199            plan: &other.plan,
3200            stringified_plans: &other.stringified_plans,
3201            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3202        };
3203        comparable_self
3204            .partial_cmp(&comparable_other)
3205            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3206            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3207    }
3208}
3209
3210/// Runs the actual plan, and then prints the physical plan with
3211/// with execution metrics.
3212#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3213pub struct Analyze {
3214    /// Should extra detail be included?
3215    pub verbose: bool,
3216    /// The logical plan that is being EXPLAIN ANALYZE'd
3217    pub input: Arc<LogicalPlan>,
3218    /// The output schema of the explain (2 columns of text)
3219    pub schema: DFSchemaRef,
3220}
3221
3222// Manual implementation needed because of `schema` field. Comparison excludes this field.
3223impl PartialOrd for Analyze {
3224    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3225        match self.verbose.partial_cmp(&other.verbose) {
3226            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3227            cmp => cmp,
3228        }
3229        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3230        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3231    }
3232}
3233
3234/// Extension operator defined outside of DataFusion
3235// TODO(clippy): This clippy `allow` should be removed if
3236// the manual `PartialEq` is removed in favor of a derive.
3237// (see `PartialEq` the impl for details.)
3238#[allow(clippy::allow_attributes)]
3239#[allow(clippy::derived_hash_with_manual_eq)]
3240#[derive(Debug, Clone, Eq, Hash)]
3241pub struct Extension {
3242    /// The runtime extension operator
3243    pub node: Arc<dyn UserDefinedLogicalNode>,
3244}
3245
3246// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3247// This manual implementation should be removed if
3248// https://github.com/rust-lang/rust/issues/39128 is fixed.
3249impl PartialEq for Extension {
3250    fn eq(&self, other: &Self) -> bool {
3251        self.node.eq(&other.node)
3252    }
3253}
3254
3255impl PartialOrd for Extension {
3256    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3257        self.node.partial_cmp(&other.node)
3258    }
3259}
3260
3261/// Produces the first `n` tuples from its input and discards the rest.
3262#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3263pub struct Limit {
3264    /// Number of rows to skip before fetch
3265    pub skip: Option<Box<Expr>>,
3266    /// Maximum number of rows to fetch,
3267    /// None means fetching all rows
3268    pub fetch: Option<Box<Expr>>,
3269    /// The logical plan
3270    pub input: Arc<LogicalPlan>,
3271}
3272
3273/// Different types of skip expression in Limit plan.
3274pub enum SkipType {
3275    /// The skip expression is a literal value.
3276    Literal(usize),
3277    /// Currently only supports expressions that can be folded into constants.
3278    UnsupportedExpr,
3279}
3280
3281/// Different types of fetch expression in Limit plan.
3282pub enum FetchType {
3283    /// The fetch expression is a literal value.
3284    /// `Literal(None)` means the fetch expression is not provided.
3285    Literal(Option<usize>),
3286    /// Currently only supports expressions that can be folded into constants.
3287    UnsupportedExpr,
3288}
3289
3290impl Limit {
3291    /// Get the skip type from the limit plan.
3292    pub fn get_skip_type(&self) -> Result<SkipType> {
3293        match self.skip.as_deref() {
3294            Some(expr) => match *expr {
3295                Expr::Literal(ScalarValue::Int64(s), _) => {
3296                    // `skip = NULL` is equivalent to `skip = 0`
3297                    let s = s.unwrap_or(0);
3298                    if s >= 0 {
3299                        Ok(SkipType::Literal(s as usize))
3300                    } else {
3301                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3302                    }
3303                }
3304                _ => Ok(SkipType::UnsupportedExpr),
3305            },
3306            // `skip = None` is equivalent to `skip = 0`
3307            None => Ok(SkipType::Literal(0)),
3308        }
3309    }
3310
3311    /// Get the fetch type from the limit plan.
3312    pub fn get_fetch_type(&self) -> Result<FetchType> {
3313        match self.fetch.as_deref() {
3314            Some(expr) => match *expr {
3315                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3316                    if s >= 0 {
3317                        Ok(FetchType::Literal(Some(s as usize)))
3318                    } else {
3319                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3320                    }
3321                }
3322                Expr::Literal(ScalarValue::Int64(None), _) => {
3323                    Ok(FetchType::Literal(None))
3324                }
3325                _ => Ok(FetchType::UnsupportedExpr),
3326            },
3327            None => Ok(FetchType::Literal(None)),
3328        }
3329    }
3330}
3331
3332/// Removes duplicate rows from the input
3333#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3334pub enum Distinct {
3335    /// Plain `DISTINCT` referencing all selection expressions
3336    All(Arc<LogicalPlan>),
3337    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3338    On(DistinctOn),
3339}
3340
3341impl Distinct {
3342    /// return a reference to the nodes input
3343    pub fn input(&self) -> &Arc<LogicalPlan> {
3344        match self {
3345            Distinct::All(input) => input,
3346            Distinct::On(DistinctOn { input, .. }) => input,
3347        }
3348    }
3349}
3350
3351/// Removes duplicate rows from the input
3352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3353pub struct DistinctOn {
3354    /// The `DISTINCT ON` clause expression list
3355    pub on_expr: Vec<Expr>,
3356    /// The selected projection expression list
3357    pub select_expr: Vec<Expr>,
3358    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3359    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3360    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3361    pub sort_expr: Option<Vec<SortExpr>>,
3362    /// The logical plan that is being DISTINCT'd
3363    pub input: Arc<LogicalPlan>,
3364    /// The schema description of the DISTINCT ON output
3365    pub schema: DFSchemaRef,
3366}
3367
3368impl DistinctOn {
3369    /// Create a new `DistinctOn` struct.
3370    pub fn try_new(
3371        on_expr: Vec<Expr>,
3372        select_expr: Vec<Expr>,
3373        sort_expr: Option<Vec<SortExpr>>,
3374        input: Arc<LogicalPlan>,
3375    ) -> Result<Self> {
3376        if on_expr.is_empty() {
3377            return plan_err!("No `ON` expressions provided");
3378        }
3379
3380        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3381        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3382            .into_iter()
3383            .collect();
3384
3385        let dfschema = DFSchema::new_with_metadata(
3386            qualified_fields,
3387            input.schema().metadata().clone(),
3388        )?;
3389
3390        let mut distinct_on = DistinctOn {
3391            on_expr,
3392            select_expr,
3393            sort_expr: None,
3394            input,
3395            schema: Arc::new(dfschema),
3396        };
3397
3398        if let Some(sort_expr) = sort_expr {
3399            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3400        }
3401
3402        Ok(distinct_on)
3403    }
3404
3405    /// Try to update `self` with a new sort expressions.
3406    ///
3407    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3408    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3409        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3410
3411        // Check that the left-most sort expressions are the same as the `ON` expressions.
3412        let mut matched = true;
3413        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3414            if on != &sort.expr {
3415                matched = false;
3416                break;
3417            }
3418        }
3419
3420        if self.on_expr.len() > sort_expr.len() || !matched {
3421            return plan_err!(
3422                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3423            );
3424        }
3425
3426        self.sort_expr = Some(sort_expr);
3427        Ok(self)
3428    }
3429}
3430
3431// Manual implementation needed because of `schema` field. Comparison excludes this field.
3432impl PartialOrd for DistinctOn {
3433    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3434        #[derive(PartialEq, PartialOrd)]
3435        struct ComparableDistinctOn<'a> {
3436            /// The `DISTINCT ON` clause expression list
3437            pub on_expr: &'a Vec<Expr>,
3438            /// The selected projection expression list
3439            pub select_expr: &'a Vec<Expr>,
3440            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3441            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3442            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3443            pub sort_expr: &'a Option<Vec<SortExpr>>,
3444            /// The logical plan that is being DISTINCT'd
3445            pub input: &'a Arc<LogicalPlan>,
3446        }
3447        let comparable_self = ComparableDistinctOn {
3448            on_expr: &self.on_expr,
3449            select_expr: &self.select_expr,
3450            sort_expr: &self.sort_expr,
3451            input: &self.input,
3452        };
3453        let comparable_other = ComparableDistinctOn {
3454            on_expr: &other.on_expr,
3455            select_expr: &other.select_expr,
3456            sort_expr: &other.sort_expr,
3457            input: &other.input,
3458        };
3459        comparable_self
3460            .partial_cmp(&comparable_other)
3461            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3462            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3463    }
3464}
3465
3466/// Aggregates its input based on a set of grouping and aggregate
3467/// expressions (e.g. SUM).
3468///
3469/// # Output Schema
3470///
3471/// The output schema is the group expressions followed by the aggregate
3472/// expressions in order.
3473///
3474/// For example, given the input schema `"A", "B", "C"` and the aggregate
3475/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3476/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3477/// single new column
3478#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3479// mark non_exhaustive to encourage use of try_new/new()
3480#[non_exhaustive]
3481pub struct Aggregate {
3482    /// The incoming logical plan
3483    pub input: Arc<LogicalPlan>,
3484    /// Grouping expressions
3485    pub group_expr: Vec<Expr>,
3486    /// Aggregate expressions
3487    pub aggr_expr: Vec<Expr>,
3488    /// The schema description of the aggregate output
3489    pub schema: DFSchemaRef,
3490}
3491
3492impl Aggregate {
3493    /// Create a new aggregate operator.
3494    pub fn try_new(
3495        input: Arc<LogicalPlan>,
3496        group_expr: Vec<Expr>,
3497        aggr_expr: Vec<Expr>,
3498    ) -> Result<Self> {
3499        let group_expr = enumerate_grouping_sets(group_expr)?;
3500
3501        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3502
3503        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3504
3505        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3506
3507        // Even columns that cannot be null will become nullable when used in a grouping set.
3508        if is_grouping_set {
3509            qualified_fields = qualified_fields
3510                .into_iter()
3511                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3512                .collect::<Vec<_>>();
3513            qualified_fields.push((
3514                None,
3515                Field::new(
3516                    Self::INTERNAL_GROUPING_ID,
3517                    Self::grouping_id_type(qualified_fields.len()),
3518                    false,
3519                )
3520                .into(),
3521            ));
3522        }
3523
3524        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3525
3526        let schema = DFSchema::new_with_metadata(
3527            qualified_fields,
3528            input.schema().metadata().clone(),
3529        )?;
3530
3531        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3532    }
3533
3534    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3535    /// building the schema again when the schema is already known.
3536    ///
3537    /// This method should only be called when you are absolutely sure that the schema being
3538    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3539    #[expect(clippy::needless_pass_by_value)]
3540    pub fn try_new_with_schema(
3541        input: Arc<LogicalPlan>,
3542        group_expr: Vec<Expr>,
3543        aggr_expr: Vec<Expr>,
3544        schema: DFSchemaRef,
3545    ) -> Result<Self> {
3546        if group_expr.is_empty() && aggr_expr.is_empty() {
3547            return plan_err!(
3548                "Aggregate requires at least one grouping or aggregate expression. \
3549                Aggregate without grouping expressions nor aggregate expressions is \
3550                logically equivalent to, but less efficient than, VALUES producing \
3551                single row. Please use VALUES instead."
3552            );
3553        }
3554        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3555        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3556            return plan_err!(
3557                "Aggregate schema has wrong number of fields. Expected {} got {}",
3558                group_expr_count + aggr_expr.len(),
3559                schema.fields().len()
3560            );
3561        }
3562
3563        let aggregate_func_dependencies =
3564            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3565        let new_schema = schema.as_ref().clone();
3566        let schema = Arc::new(
3567            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3568        );
3569        Ok(Self {
3570            input,
3571            group_expr,
3572            aggr_expr,
3573            schema,
3574        })
3575    }
3576
3577    fn is_grouping_set(&self) -> bool {
3578        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3579    }
3580
3581    /// Get the output expressions.
3582    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3583        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3584            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3585        });
3586        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3587        if self.is_grouping_set() {
3588            exprs.push(&INTERNAL_ID_EXPR);
3589        }
3590        exprs.extend(self.aggr_expr.iter());
3591        debug_assert!(exprs.len() == self.schema.fields().len());
3592        Ok(exprs)
3593    }
3594
3595    /// Get the length of the group by expression in the output schema
3596    /// This is not simply group by expression length. Expression may be
3597    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3598    pub fn group_expr_len(&self) -> Result<usize> {
3599        grouping_set_expr_count(&self.group_expr)
3600    }
3601
3602    /// Returns the data type of the grouping id.
3603    /// The grouping ID value is a bitmask where each set bit
3604    /// indicates that the corresponding grouping expression is
3605    /// null
3606    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3607        if group_exprs <= 8 {
3608            DataType::UInt8
3609        } else if group_exprs <= 16 {
3610            DataType::UInt16
3611        } else if group_exprs <= 32 {
3612            DataType::UInt32
3613        } else {
3614            DataType::UInt64
3615        }
3616    }
3617
3618    /// Internal column used when the aggregation is a grouping set.
3619    ///
3620    /// This column contains a bitmask where each bit represents a grouping
3621    /// expression. The least significant bit corresponds to the rightmost
3622    /// grouping expression. A bit value of 0 indicates that the corresponding
3623    /// column is included in the grouping set, while a value of 1 means it is excluded.
3624    ///
3625    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3626    /// column will have the following values:
3627    ///     0b00: Both `a` and `b` are included
3628    ///     0b01: `b` is excluded
3629    ///     0b10: `a` is excluded
3630    ///     0b11: Both `a` and `b` are excluded
3631    ///
3632    /// This internal column is necessary because excluded columns are replaced
3633    /// with `NULL` values. To handle these cases correctly, we must distinguish
3634    /// between an actual `NULL` value in a column and a column being excluded from the set.
3635    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3636}
3637
3638// Manual implementation needed because of `schema` field. Comparison excludes this field.
3639impl PartialOrd for Aggregate {
3640    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3641        match self.input.partial_cmp(&other.input) {
3642            Some(Ordering::Equal) => {
3643                match self.group_expr.partial_cmp(&other.group_expr) {
3644                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3645                    cmp => cmp,
3646                }
3647            }
3648            cmp => cmp,
3649        }
3650        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3651        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3652    }
3653}
3654
3655/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3656fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3657    group_expr
3658        .iter()
3659        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3660}
3661
3662/// Calculates functional dependencies for aggregate expressions.
3663fn calc_func_dependencies_for_aggregate(
3664    // Expressions in the GROUP BY clause:
3665    group_expr: &[Expr],
3666    // Input plan of the aggregate:
3667    input: &LogicalPlan,
3668    // Aggregate schema
3669    aggr_schema: &DFSchema,
3670) -> Result<FunctionalDependencies> {
3671    // We can do a case analysis on how to propagate functional dependencies based on
3672    // whether the GROUP BY in question contains a grouping set expression:
3673    // - If so, the functional dependencies will be empty because we cannot guarantee
3674    //   that GROUP BY expression results will be unique.
3675    // - Otherwise, it may be possible to propagate functional dependencies.
3676    if !contains_grouping_set(group_expr) {
3677        let group_by_expr_names = group_expr
3678            .iter()
3679            .map(|item| item.schema_name().to_string())
3680            .collect::<IndexSet<_>>()
3681            .into_iter()
3682            .collect::<Vec<_>>();
3683        let aggregate_func_dependencies = aggregate_functional_dependencies(
3684            input.schema(),
3685            &group_by_expr_names,
3686            aggr_schema,
3687        );
3688        Ok(aggregate_func_dependencies)
3689    } else {
3690        Ok(FunctionalDependencies::empty())
3691    }
3692}
3693
3694/// This function projects functional dependencies of the `input` plan according
3695/// to projection expressions `exprs`.
3696fn calc_func_dependencies_for_project(
3697    exprs: &[Expr],
3698    input: &LogicalPlan,
3699) -> Result<FunctionalDependencies> {
3700    let input_fields = input.schema().field_names();
3701    // Calculate expression indices (if present) in the input schema.
3702    let proj_indices = exprs
3703        .iter()
3704        .map(|expr| match expr {
3705            #[expect(deprecated)]
3706            Expr::Wildcard { qualifier, options } => {
3707                let wildcard_fields = exprlist_to_fields(
3708                    vec![&Expr::Wildcard {
3709                        qualifier: qualifier.clone(),
3710                        options: options.clone(),
3711                    }],
3712                    input,
3713                )?;
3714                Ok::<_, DataFusionError>(
3715                    wildcard_fields
3716                        .into_iter()
3717                        .filter_map(|(qualifier, f)| {
3718                            let flat_name = qualifier
3719                                .map(|t| format!("{}.{}", t, f.name()))
3720                                .unwrap_or_else(|| f.name().clone());
3721                            input_fields.iter().position(|item| *item == flat_name)
3722                        })
3723                        .collect::<Vec<_>>(),
3724                )
3725            }
3726            Expr::Alias(alias) => {
3727                let name = format!("{}", alias.expr);
3728                Ok(input_fields
3729                    .iter()
3730                    .position(|item| *item == name)
3731                    .map(|i| vec![i])
3732                    .unwrap_or(vec![]))
3733            }
3734            _ => {
3735                let name = format!("{expr}");
3736                Ok(input_fields
3737                    .iter()
3738                    .position(|item| *item == name)
3739                    .map(|i| vec![i])
3740                    .unwrap_or(vec![]))
3741            }
3742        })
3743        .collect::<Result<Vec<_>>>()?
3744        .into_iter()
3745        .flatten()
3746        .collect::<Vec<_>>();
3747
3748    Ok(input
3749        .schema()
3750        .functional_dependencies()
3751        .project_functional_dependencies(&proj_indices, exprs.len()))
3752}
3753
3754/// Sorts its input according to a list of sort expressions.
3755#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3756pub struct Sort {
3757    /// The sort expressions
3758    pub expr: Vec<SortExpr>,
3759    /// The incoming logical plan
3760    pub input: Arc<LogicalPlan>,
3761    /// Optional fetch limit
3762    pub fetch: Option<usize>,
3763}
3764
3765/// Join two logical plans on one or more join columns
3766#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3767pub struct Join {
3768    /// Left input
3769    pub left: Arc<LogicalPlan>,
3770    /// Right input
3771    pub right: Arc<LogicalPlan>,
3772    /// Equijoin clause expressed as pairs of (left, right) join expressions
3773    pub on: Vec<(Expr, Expr)>,
3774    /// Filters applied during join (non-equi conditions)
3775    pub filter: Option<Expr>,
3776    /// Join type
3777    pub join_type: JoinType,
3778    /// Join constraint
3779    pub join_constraint: JoinConstraint,
3780    /// The output schema, containing fields from the left and right inputs
3781    pub schema: DFSchemaRef,
3782    /// Defines the null equality for the join.
3783    pub null_equality: NullEquality,
3784}
3785
3786impl Join {
3787    /// Creates a new Join operator with automatically computed schema.
3788    ///
3789    /// This constructor computes the schema based on the join type and inputs,
3790    /// removing the need to manually specify the schema or call `recompute_schema`.
3791    ///
3792    /// # Arguments
3793    ///
3794    /// * `left` - Left input plan
3795    /// * `right` - Right input plan
3796    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3797    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3798    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3799    /// * `join_constraint` - Join constraint (On, Using)
3800    /// * `null_equality` - How to handle nulls in join comparisons
3801    ///
3802    /// # Returns
3803    ///
3804    /// A new Join operator with the computed schema
3805    pub fn try_new(
3806        left: Arc<LogicalPlan>,
3807        right: Arc<LogicalPlan>,
3808        on: Vec<(Expr, Expr)>,
3809        filter: Option<Expr>,
3810        join_type: JoinType,
3811        join_constraint: JoinConstraint,
3812        null_equality: NullEquality,
3813    ) -> Result<Self> {
3814        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3815
3816        Ok(Join {
3817            left,
3818            right,
3819            on,
3820            filter,
3821            join_type,
3822            join_constraint,
3823            schema: Arc::new(join_schema),
3824            null_equality,
3825        })
3826    }
3827
3828    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3829    /// create the physical join.
3830    pub fn try_new_with_project_input(
3831        original: &LogicalPlan,
3832        left: Arc<LogicalPlan>,
3833        right: Arc<LogicalPlan>,
3834        column_on: (Vec<Column>, Vec<Column>),
3835    ) -> Result<(Self, bool)> {
3836        let original_join = match original {
3837            LogicalPlan::Join(join) => join,
3838            _ => return plan_err!("Could not create join with project input"),
3839        };
3840
3841        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3842        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3843
3844        let mut requalified = false;
3845
3846        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3847        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3848        if original_join.join_type == JoinType::Inner
3849            || original_join.join_type == JoinType::Left
3850            || original_join.join_type == JoinType::Right
3851            || original_join.join_type == JoinType::Full
3852        {
3853            (left_sch, right_sch, requalified) =
3854                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3855        }
3856
3857        let on: Vec<(Expr, Expr)> = column_on
3858            .0
3859            .into_iter()
3860            .zip(column_on.1)
3861            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3862            .collect();
3863
3864        let join_schema = build_join_schema(
3865            left_sch.schema(),
3866            right_sch.schema(),
3867            &original_join.join_type,
3868        )?;
3869
3870        Ok((
3871            Join {
3872                left,
3873                right,
3874                on,
3875                filter: original_join.filter.clone(),
3876                join_type: original_join.join_type,
3877                join_constraint: original_join.join_constraint,
3878                schema: Arc::new(join_schema),
3879                null_equality: original_join.null_equality,
3880            },
3881            requalified,
3882        ))
3883    }
3884}
3885
3886// Manual implementation needed because of `schema` field. Comparison excludes this field.
3887impl PartialOrd for Join {
3888    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3889        #[derive(PartialEq, PartialOrd)]
3890        struct ComparableJoin<'a> {
3891            /// Left input
3892            pub left: &'a Arc<LogicalPlan>,
3893            /// Right input
3894            pub right: &'a Arc<LogicalPlan>,
3895            /// Equijoin clause expressed as pairs of (left, right) join expressions
3896            pub on: &'a Vec<(Expr, Expr)>,
3897            /// Filters applied during join (non-equi conditions)
3898            pub filter: &'a Option<Expr>,
3899            /// Join type
3900            pub join_type: &'a JoinType,
3901            /// Join constraint
3902            pub join_constraint: &'a JoinConstraint,
3903            /// The null handling behavior for equalities
3904            pub null_equality: &'a NullEquality,
3905        }
3906        let comparable_self = ComparableJoin {
3907            left: &self.left,
3908            right: &self.right,
3909            on: &self.on,
3910            filter: &self.filter,
3911            join_type: &self.join_type,
3912            join_constraint: &self.join_constraint,
3913            null_equality: &self.null_equality,
3914        };
3915        let comparable_other = ComparableJoin {
3916            left: &other.left,
3917            right: &other.right,
3918            on: &other.on,
3919            filter: &other.filter,
3920            join_type: &other.join_type,
3921            join_constraint: &other.join_constraint,
3922            null_equality: &other.null_equality,
3923        };
3924        comparable_self
3925            .partial_cmp(&comparable_other)
3926            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3927            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3928    }
3929}
3930
3931/// Subquery
3932#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3933pub struct Subquery {
3934    /// The subquery
3935    pub subquery: Arc<LogicalPlan>,
3936    /// The outer references used in the subquery
3937    pub outer_ref_columns: Vec<Expr>,
3938    /// Span information for subquery projection columns
3939    pub spans: Spans,
3940}
3941
3942impl Normalizeable for Subquery {
3943    fn can_normalize(&self) -> bool {
3944        false
3945    }
3946}
3947
3948impl NormalizeEq for Subquery {
3949    fn normalize_eq(&self, other: &Self) -> bool {
3950        // TODO: may be implement NormalizeEq for LogicalPlan?
3951        *self.subquery == *other.subquery
3952            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3953            && self
3954                .outer_ref_columns
3955                .iter()
3956                .zip(other.outer_ref_columns.iter())
3957                .all(|(a, b)| a.normalize_eq(b))
3958    }
3959}
3960
3961impl Subquery {
3962    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3963        match plan {
3964            Expr::ScalarSubquery(it) => Ok(it),
3965            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3966            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3967        }
3968    }
3969
3970    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3971        Subquery {
3972            subquery: plan,
3973            outer_ref_columns: self.outer_ref_columns.clone(),
3974            spans: Spans::new(),
3975        }
3976    }
3977}
3978
3979impl Debug for Subquery {
3980    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3981        write!(f, "<subquery>")
3982    }
3983}
3984
3985/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
3986///
3987/// See [`Partitioning`] for more details on partitioning
3988///
3989/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
3990#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3991pub enum Partitioning {
3992    /// Allocate batches using a round-robin algorithm and the specified number of partitions
3993    RoundRobinBatch(usize),
3994    /// Allocate rows based on a hash of one of more expressions and the specified number
3995    /// of partitions.
3996    Hash(Vec<Expr>, usize),
3997    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
3998    DistributeBy(Vec<Expr>),
3999}
4000
4001/// Represent the unnesting operation on a list column, such as the recursion depth and
4002/// the output column name after unnesting
4003///
4004/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4005///
4006/// ```text
4007///   input             output_name
4008///  ┌─────────┐      ┌─────────┐
4009///  │{{1,2}}  │      │ 1       │
4010///  ├─────────┼─────►├─────────┤
4011///  │{{3}}    │      │ 2       │
4012///  ├─────────┤      ├─────────┤
4013///  │{{4},{5}}│      │ 3       │
4014///  └─────────┘      ├─────────┤
4015///                   │ 4       │
4016///                   ├─────────┤
4017///                   │ 5       │
4018///                   └─────────┘
4019/// ```
4020#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4021pub struct ColumnUnnestList {
4022    pub output_column: Column,
4023    pub depth: usize,
4024}
4025
4026impl Display for ColumnUnnestList {
4027    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4028        write!(f, "{}|depth={}", self.output_column, self.depth)
4029    }
4030}
4031
4032/// Unnest a column that contains a nested list type. See
4033/// [`UnnestOptions`] for more details.
4034#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4035pub struct Unnest {
4036    /// The incoming logical plan
4037    pub input: Arc<LogicalPlan>,
4038    /// Columns to run unnest on, can be a list of (List/Struct) columns
4039    pub exec_columns: Vec<Column>,
4040    /// refer to the indices(in the input schema) of columns
4041    /// that have type list to run unnest on
4042    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4043    /// refer to the indices (in the input schema) of columns
4044    /// that have type struct to run unnest on
4045    pub struct_type_columns: Vec<usize>,
4046    /// Having items aligned with the output columns
4047    /// representing which column in the input schema each output column depends on
4048    pub dependency_indices: Vec<usize>,
4049    /// The output schema, containing the unnested field column.
4050    pub schema: DFSchemaRef,
4051    /// Options
4052    pub options: UnnestOptions,
4053}
4054
4055// Manual implementation needed because of `schema` field. Comparison excludes this field.
4056impl PartialOrd for Unnest {
4057    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4058        #[derive(PartialEq, PartialOrd)]
4059        struct ComparableUnnest<'a> {
4060            /// The incoming logical plan
4061            pub input: &'a Arc<LogicalPlan>,
4062            /// Columns to run unnest on, can be a list of (List/Struct) columns
4063            pub exec_columns: &'a Vec<Column>,
4064            /// refer to the indices(in the input schema) of columns
4065            /// that have type list to run unnest on
4066            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4067            /// refer to the indices (in the input schema) of columns
4068            /// that have type struct to run unnest on
4069            pub struct_type_columns: &'a Vec<usize>,
4070            /// Having items aligned with the output columns
4071            /// representing which column in the input schema each output column depends on
4072            pub dependency_indices: &'a Vec<usize>,
4073            /// Options
4074            pub options: &'a UnnestOptions,
4075        }
4076        let comparable_self = ComparableUnnest {
4077            input: &self.input,
4078            exec_columns: &self.exec_columns,
4079            list_type_columns: &self.list_type_columns,
4080            struct_type_columns: &self.struct_type_columns,
4081            dependency_indices: &self.dependency_indices,
4082            options: &self.options,
4083        };
4084        let comparable_other = ComparableUnnest {
4085            input: &other.input,
4086            exec_columns: &other.exec_columns,
4087            list_type_columns: &other.list_type_columns,
4088            struct_type_columns: &other.struct_type_columns,
4089            dependency_indices: &other.dependency_indices,
4090            options: &other.options,
4091        };
4092        comparable_self
4093            .partial_cmp(&comparable_other)
4094            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4095            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4096    }
4097}
4098
4099impl Unnest {
4100    pub fn try_new(
4101        input: Arc<LogicalPlan>,
4102        exec_columns: Vec<Column>,
4103        options: UnnestOptions,
4104    ) -> Result<Self> {
4105        if exec_columns.is_empty() {
4106            return plan_err!("unnest plan requires at least 1 column to unnest");
4107        }
4108
4109        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4110        let mut struct_columns = vec![];
4111        let indices_to_unnest = exec_columns
4112            .iter()
4113            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4114            .collect::<Result<HashMap<usize, &Column>>>()?;
4115
4116        let input_schema = input.schema();
4117
4118        let mut dependency_indices = vec![];
4119        // Transform input schema into new schema
4120        // Given this comprehensive example
4121        //
4122        // input schema:
4123        // 1.col1_unnest_placeholder: list[list[int]],
4124        // 2.col1: list[list[int]]
4125        // 3.col2: list[int]
4126        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4127        // output schema:
4128        // 1.unnest_col1_depth_2: int
4129        // 2.unnest_col1_depth_1: list[int]
4130        // 3.col1: list[list[int]]
4131        // 4.unnest_col2_depth_1: int
4132        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4133        // the plural.
4134        let fields = input_schema
4135            .iter()
4136            .enumerate()
4137            .map(|(index, (original_qualifier, original_field))| {
4138                match indices_to_unnest.get(&index) {
4139                    Some(column_to_unnest) => {
4140                        let recursions_on_column = options
4141                            .recursions
4142                            .iter()
4143                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4144                            .collect::<Vec<_>>();
4145                        let mut transformed_columns = recursions_on_column
4146                            .iter()
4147                            .map(|r| {
4148                                list_columns.push((
4149                                    index,
4150                                    ColumnUnnestList {
4151                                        output_column: r.output_column.clone(),
4152                                        depth: r.depth,
4153                                    },
4154                                ));
4155                                Ok(get_unnested_columns(
4156                                    &r.output_column.name,
4157                                    original_field.data_type(),
4158                                    r.depth,
4159                                )?
4160                                .into_iter()
4161                                .next()
4162                                .unwrap()) // because unnesting a list column always result into one result
4163                            })
4164                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4165                        if transformed_columns.is_empty() {
4166                            transformed_columns = get_unnested_columns(
4167                                &column_to_unnest.name,
4168                                original_field.data_type(),
4169                                1,
4170                            )?;
4171                            match original_field.data_type() {
4172                                DataType::Struct(_) => {
4173                                    struct_columns.push(index);
4174                                }
4175                                DataType::List(_)
4176                                | DataType::FixedSizeList(_, _)
4177                                | DataType::LargeList(_) => {
4178                                    list_columns.push((
4179                                        index,
4180                                        ColumnUnnestList {
4181                                            output_column: Column::from_name(
4182                                                &column_to_unnest.name,
4183                                            ),
4184                                            depth: 1,
4185                                        },
4186                                    ));
4187                                }
4188                                _ => {}
4189                            };
4190                        }
4191
4192                        // new columns dependent on the same original index
4193                        dependency_indices.extend(std::iter::repeat_n(
4194                            index,
4195                            transformed_columns.len(),
4196                        ));
4197                        Ok(transformed_columns
4198                            .iter()
4199                            .map(|(col, field)| {
4200                                (col.relation.to_owned(), field.to_owned())
4201                            })
4202                            .collect())
4203                    }
4204                    None => {
4205                        dependency_indices.push(index);
4206                        Ok(vec![(
4207                            original_qualifier.cloned(),
4208                            Arc::clone(original_field),
4209                        )])
4210                    }
4211                }
4212            })
4213            .collect::<Result<Vec<_>>>()?
4214            .into_iter()
4215            .flatten()
4216            .collect::<Vec<_>>();
4217
4218        let metadata = input_schema.metadata().clone();
4219        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4220        // We can use the existing functional dependencies:
4221        let deps = input_schema.functional_dependencies().clone();
4222        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4223
4224        Ok(Unnest {
4225            input,
4226            exec_columns,
4227            list_type_columns: list_columns,
4228            struct_type_columns: struct_columns,
4229            dependency_indices,
4230            schema,
4231            options,
4232        })
4233    }
4234}
4235
4236// Based on data type, either struct or a variant of list
4237// return a set of columns as the result of unnesting
4238// the input columns.
4239// For example, given a column with name "a",
4240// - List(Element) returns ["a"] with data type Element
4241// - Struct(field1, field2) returns ["a.field1","a.field2"]
4242// For list data type, an argument depth is used to specify
4243// the recursion level
4244fn get_unnested_columns(
4245    col_name: &String,
4246    data_type: &DataType,
4247    depth: usize,
4248) -> Result<Vec<(Column, Arc<Field>)>> {
4249    let mut qualified_columns = Vec::with_capacity(1);
4250
4251    match data_type {
4252        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4253            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4254            let new_field = Arc::new(Field::new(
4255                col_name, data_type,
4256                // Unnesting may produce NULLs even if the list is not null.
4257                // For example: unnest([1], []) -> 1, null
4258                true,
4259            ));
4260            let column = Column::from_name(col_name);
4261            // let column = Column::from((None, &new_field));
4262            qualified_columns.push((column, new_field));
4263        }
4264        DataType::Struct(fields) => {
4265            qualified_columns.extend(fields.iter().map(|f| {
4266                let new_name = format!("{}.{}", col_name, f.name());
4267                let column = Column::from_name(&new_name);
4268                let new_field = f.as_ref().clone().with_name(new_name);
4269                // let column = Column::from((None, &f));
4270                (column, Arc::new(new_field))
4271            }))
4272        }
4273        _ => {
4274            return internal_err!("trying to unnest on invalid data type {data_type}");
4275        }
4276    };
4277    Ok(qualified_columns)
4278}
4279
4280// Get the data type of a multi-dimensional type after unnesting it
4281// with a given depth
4282fn get_unnested_list_datatype_recursive(
4283    data_type: &DataType,
4284    depth: usize,
4285) -> Result<DataType> {
4286    match data_type {
4287        DataType::List(field)
4288        | DataType::FixedSizeList(field, _)
4289        | DataType::LargeList(field) => {
4290            if depth == 1 {
4291                return Ok(field.data_type().clone());
4292            }
4293            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4294        }
4295        _ => {}
4296    };
4297
4298    internal_err!("trying to unnest on invalid data type {data_type}")
4299}
4300
4301#[cfg(test)]
4302mod tests {
4303    use super::*;
4304    use crate::builder::LogicalTableSource;
4305    use crate::logical_plan::table_scan;
4306    use crate::select_expr::SelectExpr;
4307    use crate::test::function_stub::{count, count_udaf};
4308    use crate::{
4309        GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4310        scalar_subquery,
4311    };
4312    use datafusion_common::metadata::ScalarAndMetadata;
4313    use datafusion_common::tree_node::{
4314        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4315    };
4316    use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4317    use insta::{assert_debug_snapshot, assert_snapshot};
4318    use std::hash::DefaultHasher;
4319
4320    fn employee_schema() -> Schema {
4321        Schema::new(vec![
4322            Field::new("id", DataType::Int32, false),
4323            Field::new("first_name", DataType::Utf8, false),
4324            Field::new("last_name", DataType::Utf8, false),
4325            Field::new("state", DataType::Utf8, false),
4326            Field::new("salary", DataType::Int32, false),
4327        ])
4328    }
4329
4330    fn display_plan() -> Result<LogicalPlan> {
4331        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4332            .build()?;
4333
4334        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4335            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4336            .project(vec![col("id")])?
4337            .build()
4338    }
4339
4340    #[test]
4341    fn test_display_indent() -> Result<()> {
4342        let plan = display_plan()?;
4343
4344        assert_snapshot!(plan.display_indent(), @r"
4345        Projection: employee_csv.id
4346          Filter: employee_csv.state IN (<subquery>)
4347            Subquery:
4348              TableScan: employee_csv projection=[state]
4349            TableScan: employee_csv projection=[id, state]
4350        ");
4351        Ok(())
4352    }
4353
4354    #[test]
4355    fn test_display_indent_schema() -> Result<()> {
4356        let plan = display_plan()?;
4357
4358        assert_snapshot!(plan.display_indent_schema(), @r"
4359        Projection: employee_csv.id [id:Int32]
4360          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4361            Subquery: [state:Utf8]
4362              TableScan: employee_csv projection=[state] [state:Utf8]
4363            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4364        ");
4365        Ok(())
4366    }
4367
4368    #[test]
4369    fn test_display_subquery_alias() -> Result<()> {
4370        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4371            .build()?;
4372        let plan1 = Arc::new(plan1);
4373
4374        let plan =
4375            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4376                .project(vec![col("id"), exists(plan1).alias("exists")])?
4377                .build();
4378
4379        assert_snapshot!(plan?.display_indent(), @r"
4380        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4381          Subquery:
4382            TableScan: employee_csv projection=[state]
4383          TableScan: employee_csv projection=[id, state]
4384        ");
4385        Ok(())
4386    }
4387
4388    #[test]
4389    fn test_display_graphviz() -> Result<()> {
4390        let plan = display_plan()?;
4391
4392        // just test for a few key lines in the output rather than the
4393        // whole thing to make test maintenance easier.
4394        assert_snapshot!(plan.display_graphviz(), @r#"
4395        // Begin DataFusion GraphViz Plan,
4396        // display it online here: https://dreampuf.github.io/GraphvizOnline
4397
4398        digraph {
4399          subgraph cluster_1
4400          {
4401            graph[label="LogicalPlan"]
4402            2[shape=box label="Projection: employee_csv.id"]
4403            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4404            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4405            4[shape=box label="Subquery:"]
4406            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4407            5[shape=box label="TableScan: employee_csv projection=[state]"]
4408            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4409            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4410            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4411          }
4412          subgraph cluster_7
4413          {
4414            graph[label="Detailed LogicalPlan"]
4415            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4416            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4417            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4418            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4419            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4420            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4421            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4422            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4423            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4424          }
4425        }
4426        // End DataFusion GraphViz Plan
4427        "#);
4428        Ok(())
4429    }
4430
4431    #[test]
4432    fn test_display_pg_json() -> Result<()> {
4433        let plan = display_plan()?;
4434
4435        assert_snapshot!(plan.display_pg_json(), @r#"
4436        [
4437          {
4438            "Plan": {
4439              "Expressions": [
4440                "employee_csv.id"
4441              ],
4442              "Node Type": "Projection",
4443              "Output": [
4444                "id"
4445              ],
4446              "Plans": [
4447                {
4448                  "Condition": "employee_csv.state IN (<subquery>)",
4449                  "Node Type": "Filter",
4450                  "Output": [
4451                    "id",
4452                    "state"
4453                  ],
4454                  "Plans": [
4455                    {
4456                      "Node Type": "Subquery",
4457                      "Output": [
4458                        "state"
4459                      ],
4460                      "Plans": [
4461                        {
4462                          "Node Type": "TableScan",
4463                          "Output": [
4464                            "state"
4465                          ],
4466                          "Plans": [],
4467                          "Relation Name": "employee_csv"
4468                        }
4469                      ]
4470                    },
4471                    {
4472                      "Node Type": "TableScan",
4473                      "Output": [
4474                        "id",
4475                        "state"
4476                      ],
4477                      "Plans": [],
4478                      "Relation Name": "employee_csv"
4479                    }
4480                  ]
4481                }
4482              ]
4483            }
4484          }
4485        ]
4486        "#);
4487        Ok(())
4488    }
4489
4490    /// Tests for the Visitor trait and walking logical plan nodes
4491    #[derive(Debug, Default)]
4492    struct OkVisitor {
4493        strings: Vec<String>,
4494    }
4495
4496    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4497        type Node = LogicalPlan;
4498
4499        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4500            let s = match plan {
4501                LogicalPlan::Projection { .. } => "pre_visit Projection",
4502                LogicalPlan::Filter { .. } => "pre_visit Filter",
4503                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4504                _ => {
4505                    return not_impl_err!("unknown plan type");
4506                }
4507            };
4508
4509            self.strings.push(s.into());
4510            Ok(TreeNodeRecursion::Continue)
4511        }
4512
4513        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4514            let s = match plan {
4515                LogicalPlan::Projection { .. } => "post_visit Projection",
4516                LogicalPlan::Filter { .. } => "post_visit Filter",
4517                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4518                _ => {
4519                    return not_impl_err!("unknown plan type");
4520                }
4521            };
4522
4523            self.strings.push(s.into());
4524            Ok(TreeNodeRecursion::Continue)
4525        }
4526    }
4527
4528    #[test]
4529    fn visit_order() {
4530        let mut visitor = OkVisitor::default();
4531        let plan = test_plan();
4532        let res = plan.visit_with_subqueries(&mut visitor);
4533        assert!(res.is_ok());
4534
4535        assert_debug_snapshot!(visitor.strings, @r#"
4536        [
4537            "pre_visit Projection",
4538            "pre_visit Filter",
4539            "pre_visit TableScan",
4540            "post_visit TableScan",
4541            "post_visit Filter",
4542            "post_visit Projection",
4543        ]
4544        "#);
4545    }
4546
4547    #[derive(Debug, Default)]
4548    /// Counter than counts to zero and returns true when it gets there
4549    struct OptionalCounter {
4550        val: Option<usize>,
4551    }
4552
4553    impl OptionalCounter {
4554        fn new(val: usize) -> Self {
4555            Self { val: Some(val) }
4556        }
4557        // Decrements the counter by 1, if any, returning true if it hits zero
4558        fn dec(&mut self) -> bool {
4559            if Some(0) == self.val {
4560                true
4561            } else {
4562                self.val = self.val.take().map(|i| i - 1);
4563                false
4564            }
4565        }
4566    }
4567
4568    #[derive(Debug, Default)]
4569    /// Visitor that returns false after some number of visits
4570    struct StoppingVisitor {
4571        inner: OkVisitor,
4572        /// When Some(0) returns false from pre_visit
4573        return_false_from_pre_in: OptionalCounter,
4574        /// When Some(0) returns false from post_visit
4575        return_false_from_post_in: OptionalCounter,
4576    }
4577
4578    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4579        type Node = LogicalPlan;
4580
4581        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4582            if self.return_false_from_pre_in.dec() {
4583                return Ok(TreeNodeRecursion::Stop);
4584            }
4585            self.inner.f_down(plan)?;
4586
4587            Ok(TreeNodeRecursion::Continue)
4588        }
4589
4590        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4591            if self.return_false_from_post_in.dec() {
4592                return Ok(TreeNodeRecursion::Stop);
4593            }
4594
4595            self.inner.f_up(plan)
4596        }
4597    }
4598
4599    /// test early stopping in pre-visit
4600    #[test]
4601    fn early_stopping_pre_visit() {
4602        let mut visitor = StoppingVisitor {
4603            return_false_from_pre_in: OptionalCounter::new(2),
4604            ..Default::default()
4605        };
4606        let plan = test_plan();
4607        let res = plan.visit_with_subqueries(&mut visitor);
4608        assert!(res.is_ok());
4609
4610        assert_debug_snapshot!(
4611            visitor.inner.strings,
4612            @r#"
4613        [
4614            "pre_visit Projection",
4615            "pre_visit Filter",
4616        ]
4617        "#
4618        );
4619    }
4620
4621    #[test]
4622    fn early_stopping_post_visit() {
4623        let mut visitor = StoppingVisitor {
4624            return_false_from_post_in: OptionalCounter::new(1),
4625            ..Default::default()
4626        };
4627        let plan = test_plan();
4628        let res = plan.visit_with_subqueries(&mut visitor);
4629        assert!(res.is_ok());
4630
4631        assert_debug_snapshot!(
4632            visitor.inner.strings,
4633            @r#"
4634        [
4635            "pre_visit Projection",
4636            "pre_visit Filter",
4637            "pre_visit TableScan",
4638            "post_visit TableScan",
4639        ]
4640        "#
4641        );
4642    }
4643
4644    #[derive(Debug, Default)]
4645    /// Visitor that returns an error after some number of visits
4646    struct ErrorVisitor {
4647        inner: OkVisitor,
4648        /// When Some(0) returns false from pre_visit
4649        return_error_from_pre_in: OptionalCounter,
4650        /// When Some(0) returns false from post_visit
4651        return_error_from_post_in: OptionalCounter,
4652    }
4653
4654    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4655        type Node = LogicalPlan;
4656
4657        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4658            if self.return_error_from_pre_in.dec() {
4659                return not_impl_err!("Error in pre_visit");
4660            }
4661
4662            self.inner.f_down(plan)
4663        }
4664
4665        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4666            if self.return_error_from_post_in.dec() {
4667                return not_impl_err!("Error in post_visit");
4668            }
4669
4670            self.inner.f_up(plan)
4671        }
4672    }
4673
4674    #[test]
4675    fn error_pre_visit() {
4676        let mut visitor = ErrorVisitor {
4677            return_error_from_pre_in: OptionalCounter::new(2),
4678            ..Default::default()
4679        };
4680        let plan = test_plan();
4681        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4682        assert_snapshot!(
4683            res.strip_backtrace(),
4684            @"This feature is not implemented: Error in pre_visit"
4685        );
4686        assert_debug_snapshot!(
4687            visitor.inner.strings,
4688            @r#"
4689        [
4690            "pre_visit Projection",
4691            "pre_visit Filter",
4692        ]
4693        "#
4694        );
4695    }
4696
4697    #[test]
4698    fn error_post_visit() {
4699        let mut visitor = ErrorVisitor {
4700            return_error_from_post_in: OptionalCounter::new(1),
4701            ..Default::default()
4702        };
4703        let plan = test_plan();
4704        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4705        assert_snapshot!(
4706            res.strip_backtrace(),
4707            @"This feature is not implemented: Error in post_visit"
4708        );
4709        assert_debug_snapshot!(
4710            visitor.inner.strings,
4711            @r#"
4712        [
4713            "pre_visit Projection",
4714            "pre_visit Filter",
4715            "pre_visit TableScan",
4716            "post_visit TableScan",
4717        ]
4718        "#
4719        );
4720    }
4721
4722    #[test]
4723    fn test_partial_eq_hash_and_partial_ord() {
4724        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4725            produce_one_row: true,
4726            schema: Arc::new(DFSchema::empty()),
4727        }));
4728
4729        let count_window_function = |schema| {
4730            Window::try_new_with_schema(
4731                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4732                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4733                    vec![],
4734                )))],
4735                Arc::clone(&empty_values),
4736                Arc::new(schema),
4737            )
4738            .unwrap()
4739        };
4740
4741        let schema_without_metadata = || {
4742            DFSchema::from_unqualified_fields(
4743                vec![Field::new("count", DataType::Int64, false)].into(),
4744                HashMap::new(),
4745            )
4746            .unwrap()
4747        };
4748
4749        let schema_with_metadata = || {
4750            DFSchema::from_unqualified_fields(
4751                vec![Field::new("count", DataType::Int64, false)].into(),
4752                [("key".to_string(), "value".to_string())].into(),
4753            )
4754            .unwrap()
4755        };
4756
4757        // A Window
4758        let f = count_window_function(schema_without_metadata());
4759
4760        // Same like `f`, different instance
4761        let f2 = count_window_function(schema_without_metadata());
4762        assert_eq!(f, f2);
4763        assert_eq!(hash(&f), hash(&f2));
4764        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4765
4766        // Same like `f`, except for schema metadata
4767        let o = count_window_function(schema_with_metadata());
4768        assert_ne!(f, o);
4769        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4770        assert_eq!(f.partial_cmp(&o), None);
4771    }
4772
4773    fn hash<T: Hash>(value: &T) -> u64 {
4774        let hasher = &mut DefaultHasher::new();
4775        value.hash(hasher);
4776        hasher.finish()
4777    }
4778
4779    #[test]
4780    fn projection_expr_schema_mismatch() -> Result<()> {
4781        let empty_schema = Arc::new(DFSchema::empty());
4782        let p = Projection::try_new_with_schema(
4783            vec![col("a")],
4784            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4785                produce_one_row: false,
4786                schema: Arc::clone(&empty_schema),
4787            })),
4788            empty_schema,
4789        );
4790        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4791        Ok(())
4792    }
4793
4794    fn test_plan() -> LogicalPlan {
4795        let schema = Schema::new(vec![
4796            Field::new("id", DataType::Int32, false),
4797            Field::new("state", DataType::Utf8, false),
4798        ]);
4799
4800        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4801            .unwrap()
4802            .filter(col("state").eq(lit("CO")))
4803            .unwrap()
4804            .project(vec![col("id")])
4805            .unwrap()
4806            .build()
4807            .unwrap()
4808    }
4809
4810    #[test]
4811    fn test_replace_invalid_placeholder() {
4812        // test empty placeholder
4813        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4814
4815        let plan = table_scan(TableReference::none(), &schema, None)
4816            .unwrap()
4817            .filter(col("id").eq(placeholder("")))
4818            .unwrap()
4819            .build()
4820            .unwrap();
4821
4822        let param_values = vec![ScalarValue::Int32(Some(42))];
4823        plan.replace_params_with_values(&param_values.clone().into())
4824            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4825
4826        // test $0 placeholder
4827        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4828
4829        let plan = table_scan(TableReference::none(), &schema, None)
4830            .unwrap()
4831            .filter(col("id").eq(placeholder("$0")))
4832            .unwrap()
4833            .build()
4834            .unwrap();
4835
4836        plan.replace_params_with_values(&param_values.clone().into())
4837            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4838
4839        // test $00 placeholder
4840        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4841
4842        let plan = table_scan(TableReference::none(), &schema, None)
4843            .unwrap()
4844            .filter(col("id").eq(placeholder("$00")))
4845            .unwrap()
4846            .build()
4847            .unwrap();
4848
4849        plan.replace_params_with_values(&param_values.into())
4850            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4851    }
4852
4853    #[test]
4854    fn test_replace_placeholder_mismatched_metadata() {
4855        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4856
4857        // Create a prepared statement with explicit fields that do not have metadata
4858        let plan = table_scan(TableReference::none(), &schema, None)
4859            .unwrap()
4860            .filter(col("id").eq(placeholder("$1")))
4861            .unwrap()
4862            .build()
4863            .unwrap();
4864        let prepared_builder = LogicalPlanBuilder::new(plan)
4865            .prepare(
4866                "".to_string(),
4867                vec![Field::new("", DataType::Int32, true).into()],
4868            )
4869            .unwrap();
4870
4871        // Attempt to bind a parameter with metadata
4872        let mut scalar_meta = HashMap::new();
4873        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4874        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4875            ScalarValue::Int32(Some(42)),
4876            Some(scalar_meta.into()),
4877        )]);
4878        prepared_builder
4879            .plan()
4880            .clone()
4881            .with_param_values(param_values)
4882            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4883    }
4884
4885    #[test]
4886    fn test_replace_placeholder_empty_relation_valid_schema() {
4887        // SELECT $1, $2;
4888        let plan = LogicalPlanBuilder::empty(false)
4889            .project(vec![
4890                SelectExpr::from(placeholder("$1")),
4891                SelectExpr::from(placeholder("$2")),
4892            ])
4893            .unwrap()
4894            .build()
4895            .unwrap();
4896
4897        // original
4898        assert_snapshot!(plan.display_indent_schema(), @r"
4899        Projection: $1, $2 [$1:Null;N, $2:Null;N]
4900          EmptyRelation: rows=0 []
4901        ");
4902
4903        let plan = plan
4904            .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4905            .unwrap();
4906
4907        // replaced
4908        assert_snapshot!(plan.display_indent_schema(), @r#"
4909        Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4910          EmptyRelation: rows=0 []
4911        "#);
4912    }
4913
4914    #[test]
4915    fn test_nullable_schema_after_grouping_set() {
4916        let schema = Schema::new(vec![
4917            Field::new("foo", DataType::Int32, false),
4918            Field::new("bar", DataType::Int32, false),
4919        ]);
4920
4921        let plan = table_scan(TableReference::none(), &schema, None)
4922            .unwrap()
4923            .aggregate(
4924                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4925                    vec![col("foo")],
4926                    vec![col("bar")],
4927                ]))],
4928                vec![count(lit(true))],
4929            )
4930            .unwrap()
4931            .build()
4932            .unwrap();
4933
4934        let output_schema = plan.schema();
4935
4936        assert!(
4937            output_schema
4938                .field_with_name(None, "foo")
4939                .unwrap()
4940                .is_nullable(),
4941        );
4942        assert!(
4943            output_schema
4944                .field_with_name(None, "bar")
4945                .unwrap()
4946                .is_nullable()
4947        );
4948    }
4949
4950    #[test]
4951    fn test_filter_is_scalar() {
4952        // test empty placeholder
4953        let schema =
4954            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4955
4956        let source = Arc::new(LogicalTableSource::new(schema));
4957        let schema = Arc::new(
4958            DFSchema::try_from_qualified_schema(
4959                TableReference::bare("tab"),
4960                &source.schema(),
4961            )
4962            .unwrap(),
4963        );
4964        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4965            table_name: TableReference::bare("tab"),
4966            source: Arc::clone(&source) as Arc<dyn TableSource>,
4967            projection: None,
4968            projected_schema: Arc::clone(&schema),
4969            filters: vec![],
4970            fetch: None,
4971        }));
4972        let col = schema.field_names()[0].clone();
4973
4974        let filter = Filter::try_new(
4975            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4976            scan,
4977        )
4978        .unwrap();
4979        assert!(!filter.is_scalar());
4980        let unique_schema = Arc::new(
4981            schema
4982                .as_ref()
4983                .clone()
4984                .with_functional_dependencies(
4985                    FunctionalDependencies::new_from_constraints(
4986                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4987                            vec![0],
4988                        )])),
4989                        1,
4990                    ),
4991                )
4992                .unwrap(),
4993        );
4994        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4995            table_name: TableReference::bare("tab"),
4996            source,
4997            projection: None,
4998            projected_schema: Arc::clone(&unique_schema),
4999            filters: vec![],
5000            fetch: None,
5001        }));
5002        let col = schema.field_names()[0].clone();
5003
5004        let filter =
5005            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5006        assert!(filter.is_scalar());
5007    }
5008
5009    #[test]
5010    fn test_transform_explain() {
5011        let schema = Schema::new(vec![
5012            Field::new("foo", DataType::Int32, false),
5013            Field::new("bar", DataType::Int32, false),
5014        ]);
5015
5016        let plan = table_scan(TableReference::none(), &schema, None)
5017            .unwrap()
5018            .explain(false, false)
5019            .unwrap()
5020            .build()
5021            .unwrap();
5022
5023        let external_filter = col("foo").eq(lit(true));
5024
5025        // after transformation, because plan is not the same anymore,
5026        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
5027        let plan = plan
5028            .transform(|plan| match plan {
5029                LogicalPlan::TableScan(table) => {
5030                    let filter = Filter::try_new(
5031                        external_filter.clone(),
5032                        Arc::new(LogicalPlan::TableScan(table)),
5033                    )
5034                    .unwrap();
5035                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5036                }
5037                x => Ok(Transformed::no(x)),
5038            })
5039            .data()
5040            .unwrap();
5041
5042        let actual = format!("{}", plan.display_indent());
5043        assert_snapshot!(actual, @r"
5044        Explain
5045          Filter: foo = Boolean(true)
5046            TableScan: ?table?
5047        ")
5048    }
5049
5050    #[test]
5051    fn test_plan_partial_ord() {
5052        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5053            produce_one_row: false,
5054            schema: Arc::new(DFSchema::empty()),
5055        });
5056
5057        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5058            schema: Arc::new(Schema::new(vec![Field::new(
5059                "foo",
5060                DataType::Int32,
5061                false,
5062            )])),
5063            output_schema: DFSchemaRef::new(DFSchema::empty()),
5064        });
5065
5066        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5067            schema: Arc::new(Schema::new(vec![Field::new(
5068                "foo",
5069                DataType::Int32,
5070                false,
5071            )])),
5072            output_schema: DFSchemaRef::new(DFSchema::empty()),
5073        });
5074
5075        assert_eq!(
5076            empty_relation.partial_cmp(&describe_table),
5077            Some(Ordering::Less)
5078        );
5079        assert_eq!(
5080            describe_table.partial_cmp(&empty_relation),
5081            Some(Ordering::Greater)
5082        );
5083        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5084    }
5085
5086    #[test]
5087    fn test_limit_with_new_children() {
5088        let input = Arc::new(LogicalPlan::Values(Values {
5089            schema: Arc::new(DFSchema::empty()),
5090            values: vec![vec![]],
5091        }));
5092        let cases = [
5093            LogicalPlan::Limit(Limit {
5094                skip: None,
5095                fetch: None,
5096                input: Arc::clone(&input),
5097            }),
5098            LogicalPlan::Limit(Limit {
5099                skip: None,
5100                fetch: Some(Box::new(Expr::Literal(
5101                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5102                    None,
5103                ))),
5104                input: Arc::clone(&input),
5105            }),
5106            LogicalPlan::Limit(Limit {
5107                skip: Some(Box::new(Expr::Literal(
5108                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5109                    None,
5110                ))),
5111                fetch: None,
5112                input: Arc::clone(&input),
5113            }),
5114            LogicalPlan::Limit(Limit {
5115                skip: Some(Box::new(Expr::Literal(
5116                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5117                    None,
5118                ))),
5119                fetch: Some(Box::new(Expr::Literal(
5120                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5121                    None,
5122                ))),
5123                input,
5124            }),
5125        ];
5126
5127        for limit in cases {
5128            let new_limit = limit
5129                .with_new_exprs(
5130                    limit.expressions(),
5131                    limit.inputs().into_iter().cloned().collect(),
5132                )
5133                .unwrap();
5134            assert_eq!(limit, new_limit);
5135        }
5136    }
5137
5138    #[test]
5139    fn test_with_subqueries_jump() {
5140        // The test plan contains a `Project` node above a `Filter` node, and the
5141        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5142        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5143        // `Filter`s.
5144        let subquery_schema =
5145            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5146
5147        let subquery_plan =
5148            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5149                .unwrap()
5150                .filter(col("sub_id").eq(lit(0)))
5151                .unwrap()
5152                .build()
5153                .unwrap();
5154
5155        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5156
5157        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5158            .unwrap()
5159            .filter(col("id").eq(lit(0)))
5160            .unwrap()
5161            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5162            .unwrap()
5163            .build()
5164            .unwrap();
5165
5166        let mut filter_found = false;
5167        plan.apply_with_subqueries(|plan| {
5168            match plan {
5169                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5170                LogicalPlan::Filter(..) => filter_found = true,
5171                _ => {}
5172            }
5173            Ok(TreeNodeRecursion::Continue)
5174        })
5175        .unwrap();
5176        assert!(!filter_found);
5177
5178        struct ProjectJumpVisitor {
5179            filter_found: bool,
5180        }
5181
5182        impl ProjectJumpVisitor {
5183            fn new() -> Self {
5184                Self {
5185                    filter_found: false,
5186                }
5187            }
5188        }
5189
5190        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5191            type Node = LogicalPlan;
5192
5193            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5194                match node {
5195                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5196                    LogicalPlan::Filter(..) => self.filter_found = true,
5197                    _ => {}
5198                }
5199                Ok(TreeNodeRecursion::Continue)
5200            }
5201        }
5202
5203        let mut visitor = ProjectJumpVisitor::new();
5204        plan.visit_with_subqueries(&mut visitor).unwrap();
5205        assert!(!visitor.filter_found);
5206
5207        let mut filter_found = false;
5208        plan.clone()
5209            .transform_down_with_subqueries(|plan| {
5210                match plan {
5211                    LogicalPlan::Projection(..) => {
5212                        return Ok(Transformed::new(
5213                            plan,
5214                            false,
5215                            TreeNodeRecursion::Jump,
5216                        ));
5217                    }
5218                    LogicalPlan::Filter(..) => filter_found = true,
5219                    _ => {}
5220                }
5221                Ok(Transformed::no(plan))
5222            })
5223            .unwrap();
5224        assert!(!filter_found);
5225
5226        let mut filter_found = false;
5227        plan.clone()
5228            .transform_down_up_with_subqueries(
5229                |plan| {
5230                    match plan {
5231                        LogicalPlan::Projection(..) => {
5232                            return Ok(Transformed::new(
5233                                plan,
5234                                false,
5235                                TreeNodeRecursion::Jump,
5236                            ));
5237                        }
5238                        LogicalPlan::Filter(..) => filter_found = true,
5239                        _ => {}
5240                    }
5241                    Ok(Transformed::no(plan))
5242                },
5243                |plan| Ok(Transformed::no(plan)),
5244            )
5245            .unwrap();
5246        assert!(!filter_found);
5247
5248        struct ProjectJumpRewriter {
5249            filter_found: bool,
5250        }
5251
5252        impl ProjectJumpRewriter {
5253            fn new() -> Self {
5254                Self {
5255                    filter_found: false,
5256                }
5257            }
5258        }
5259
5260        impl TreeNodeRewriter for ProjectJumpRewriter {
5261            type Node = LogicalPlan;
5262
5263            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5264                match node {
5265                    LogicalPlan::Projection(..) => {
5266                        return Ok(Transformed::new(
5267                            node,
5268                            false,
5269                            TreeNodeRecursion::Jump,
5270                        ));
5271                    }
5272                    LogicalPlan::Filter(..) => self.filter_found = true,
5273                    _ => {}
5274                }
5275                Ok(Transformed::no(node))
5276            }
5277        }
5278
5279        let mut rewriter = ProjectJumpRewriter::new();
5280        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5281        assert!(!rewriter.filter_found);
5282    }
5283
5284    #[test]
5285    fn test_with_unresolved_placeholders() {
5286        let field_name = "id";
5287        let placeholder_value = "$1";
5288        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5289
5290        let plan = table_scan(TableReference::none(), &schema, None)
5291            .unwrap()
5292            .filter(col(field_name).eq(placeholder(placeholder_value)))
5293            .unwrap()
5294            .build()
5295            .unwrap();
5296
5297        // Check that the placeholder parameters have not received a DataType.
5298        let params = plan.get_parameter_fields().unwrap();
5299        assert_eq!(params.len(), 1);
5300
5301        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5302        assert_eq!(parameter_type, None);
5303    }
5304
5305    #[test]
5306    fn test_join_with_new_exprs() -> Result<()> {
5307        fn create_test_join(
5308            on: Vec<(Expr, Expr)>,
5309            filter: Option<Expr>,
5310        ) -> Result<LogicalPlan> {
5311            let schema = Schema::new(vec![
5312                Field::new("a", DataType::Int32, false),
5313                Field::new("b", DataType::Int32, false),
5314            ]);
5315
5316            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5317            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5318
5319            Ok(LogicalPlan::Join(Join {
5320                left: Arc::new(
5321                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5322                ),
5323                right: Arc::new(
5324                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5325                ),
5326                on,
5327                filter,
5328                join_type: JoinType::Inner,
5329                join_constraint: JoinConstraint::On,
5330                schema: Arc::new(left_schema.join(&right_schema)?),
5331                null_equality: NullEquality::NullEqualsNothing,
5332            }))
5333        }
5334
5335        {
5336            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5337            let LogicalPlan::Join(join) = join.with_new_exprs(
5338                join.expressions(),
5339                join.inputs().into_iter().cloned().collect(),
5340            )?
5341            else {
5342                unreachable!()
5343            };
5344            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5345            assert_eq!(join.filter, None);
5346        }
5347
5348        {
5349            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5350            let LogicalPlan::Join(join) = join.with_new_exprs(
5351                join.expressions(),
5352                join.inputs().into_iter().cloned().collect(),
5353            )?
5354            else {
5355                unreachable!()
5356            };
5357            assert_eq!(join.on, vec![]);
5358            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5359        }
5360
5361        {
5362            let join = create_test_join(
5363                vec![(col("t1.a"), (col("t2.a")))],
5364                Some(col("t1.b").gt(col("t2.b"))),
5365            )?;
5366            let LogicalPlan::Join(join) = join.with_new_exprs(
5367                join.expressions(),
5368                join.inputs().into_iter().cloned().collect(),
5369            )?
5370            else {
5371                unreachable!()
5372            };
5373            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5374            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5375        }
5376
5377        {
5378            let join = create_test_join(
5379                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5380                None,
5381            )?;
5382            let LogicalPlan::Join(join) = join.with_new_exprs(
5383                vec![
5384                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5385                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5386                    col("t1.b"),
5387                    col("t2.b"),
5388                    lit(true),
5389                ],
5390                join.inputs().into_iter().cloned().collect(),
5391            )?
5392            else {
5393                unreachable!()
5394            };
5395            assert_eq!(
5396                join.on,
5397                vec![
5398                    (
5399                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5400                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5401                    ),
5402                    (col("t1.b"), (col("t2.b")))
5403                ]
5404            );
5405            assert_eq!(join.filter, Some(lit(true)));
5406        }
5407
5408        Ok(())
5409    }
5410
5411    #[test]
5412    fn test_join_try_new() -> Result<()> {
5413        let schema = Schema::new(vec![
5414            Field::new("a", DataType::Int32, false),
5415            Field::new("b", DataType::Int32, false),
5416        ]);
5417
5418        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5419
5420        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5421
5422        let join_types = vec![
5423            JoinType::Inner,
5424            JoinType::Left,
5425            JoinType::Right,
5426            JoinType::Full,
5427            JoinType::LeftSemi,
5428            JoinType::LeftAnti,
5429            JoinType::RightSemi,
5430            JoinType::RightAnti,
5431            JoinType::LeftMark,
5432        ];
5433
5434        for join_type in join_types {
5435            let join = Join::try_new(
5436                Arc::new(left_scan.clone()),
5437                Arc::new(right_scan.clone()),
5438                vec![(col("t1.a"), col("t2.a"))],
5439                Some(col("t1.b").gt(col("t2.b"))),
5440                join_type,
5441                JoinConstraint::On,
5442                NullEquality::NullEqualsNothing,
5443            )?;
5444
5445            match join_type {
5446                JoinType::LeftSemi | JoinType::LeftAnti => {
5447                    assert_eq!(join.schema.fields().len(), 2);
5448
5449                    let fields = join.schema.fields();
5450                    assert_eq!(
5451                        fields[0].name(),
5452                        "a",
5453                        "First field should be 'a' from left table"
5454                    );
5455                    assert_eq!(
5456                        fields[1].name(),
5457                        "b",
5458                        "Second field should be 'b' from left table"
5459                    );
5460                }
5461                JoinType::RightSemi | JoinType::RightAnti => {
5462                    assert_eq!(join.schema.fields().len(), 2);
5463
5464                    let fields = join.schema.fields();
5465                    assert_eq!(
5466                        fields[0].name(),
5467                        "a",
5468                        "First field should be 'a' from right table"
5469                    );
5470                    assert_eq!(
5471                        fields[1].name(),
5472                        "b",
5473                        "Second field should be 'b' from right table"
5474                    );
5475                }
5476                JoinType::LeftMark => {
5477                    assert_eq!(join.schema.fields().len(), 3);
5478
5479                    let fields = join.schema.fields();
5480                    assert_eq!(
5481                        fields[0].name(),
5482                        "a",
5483                        "First field should be 'a' from left table"
5484                    );
5485                    assert_eq!(
5486                        fields[1].name(),
5487                        "b",
5488                        "Second field should be 'b' from left table"
5489                    );
5490                    assert_eq!(
5491                        fields[2].name(),
5492                        "mark",
5493                        "Third field should be the mark column"
5494                    );
5495
5496                    assert!(!fields[0].is_nullable());
5497                    assert!(!fields[1].is_nullable());
5498                    assert!(!fields[2].is_nullable());
5499                }
5500                _ => {
5501                    assert_eq!(join.schema.fields().len(), 4);
5502
5503                    let fields = join.schema.fields();
5504                    assert_eq!(
5505                        fields[0].name(),
5506                        "a",
5507                        "First field should be 'a' from left table"
5508                    );
5509                    assert_eq!(
5510                        fields[1].name(),
5511                        "b",
5512                        "Second field should be 'b' from left table"
5513                    );
5514                    assert_eq!(
5515                        fields[2].name(),
5516                        "a",
5517                        "Third field should be 'a' from right table"
5518                    );
5519                    assert_eq!(
5520                        fields[3].name(),
5521                        "b",
5522                        "Fourth field should be 'b' from right table"
5523                    );
5524
5525                    if join_type == JoinType::Left {
5526                        // Left side fields (first two) shouldn't be nullable
5527                        assert!(!fields[0].is_nullable());
5528                        assert!(!fields[1].is_nullable());
5529                        // Right side fields (third and fourth) should be nullable
5530                        assert!(fields[2].is_nullable());
5531                        assert!(fields[3].is_nullable());
5532                    } else if join_type == JoinType::Right {
5533                        // Left side fields (first two) should be nullable
5534                        assert!(fields[0].is_nullable());
5535                        assert!(fields[1].is_nullable());
5536                        // Right side fields (third and fourth) shouldn't be nullable
5537                        assert!(!fields[2].is_nullable());
5538                        assert!(!fields[3].is_nullable());
5539                    } else if join_type == JoinType::Full {
5540                        assert!(fields[0].is_nullable());
5541                        assert!(fields[1].is_nullable());
5542                        assert!(fields[2].is_nullable());
5543                        assert!(fields[3].is_nullable());
5544                    }
5545                }
5546            }
5547
5548            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5549            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5550            assert_eq!(join.join_type, join_type);
5551            assert_eq!(join.join_constraint, JoinConstraint::On);
5552            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5553        }
5554
5555        Ok(())
5556    }
5557
5558    #[test]
5559    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5560        let left_schema = Schema::new(vec![
5561            Field::new("id", DataType::Int32, false), // Common column in both tables
5562            Field::new("name", DataType::Utf8, false), // Unique to left
5563            Field::new("value", DataType::Int32, false), // Common column, different meaning
5564        ]);
5565
5566        let right_schema = Schema::new(vec![
5567            Field::new("id", DataType::Int32, false), // Common column in both tables
5568            Field::new("category", DataType::Utf8, false), // Unique to right
5569            Field::new("value", DataType::Float64, true), // Common column, different meaning
5570        ]);
5571
5572        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5573
5574        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5575
5576        // Test 1: USING constraint with a common column
5577        {
5578            // In the logical plan, both copies of the `id` column are preserved
5579            // The USING constraint is handled later during physical execution, where the common column appears once
5580            let join = Join::try_new(
5581                Arc::new(left_plan.clone()),
5582                Arc::new(right_plan.clone()),
5583                vec![(col("t1.id"), col("t2.id"))],
5584                None,
5585                JoinType::Inner,
5586                JoinConstraint::Using,
5587                NullEquality::NullEqualsNothing,
5588            )?;
5589
5590            let fields = join.schema.fields();
5591
5592            assert_eq!(fields.len(), 6);
5593
5594            assert_eq!(
5595                fields[0].name(),
5596                "id",
5597                "First field should be 'id' from left table"
5598            );
5599            assert_eq!(
5600                fields[1].name(),
5601                "name",
5602                "Second field should be 'name' from left table"
5603            );
5604            assert_eq!(
5605                fields[2].name(),
5606                "value",
5607                "Third field should be 'value' from left table"
5608            );
5609            assert_eq!(
5610                fields[3].name(),
5611                "id",
5612                "Fourth field should be 'id' from right table"
5613            );
5614            assert_eq!(
5615                fields[4].name(),
5616                "category",
5617                "Fifth field should be 'category' from right table"
5618            );
5619            assert_eq!(
5620                fields[5].name(),
5621                "value",
5622                "Sixth field should be 'value' from right table"
5623            );
5624
5625            assert_eq!(join.join_constraint, JoinConstraint::Using);
5626        }
5627
5628        // Test 2: Complex join condition with expressions
5629        {
5630            // Complex condition: join on id equality AND where left.value < right.value
5631            let join = Join::try_new(
5632                Arc::new(left_plan.clone()),
5633                Arc::new(right_plan.clone()),
5634                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5635                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5636                JoinType::Inner,
5637                JoinConstraint::On,
5638                NullEquality::NullEqualsNothing,
5639            )?;
5640
5641            let fields = join.schema.fields();
5642            assert_eq!(fields.len(), 6);
5643
5644            assert_eq!(
5645                fields[0].name(),
5646                "id",
5647                "First field should be 'id' from left table"
5648            );
5649            assert_eq!(
5650                fields[1].name(),
5651                "name",
5652                "Second field should be 'name' from left table"
5653            );
5654            assert_eq!(
5655                fields[2].name(),
5656                "value",
5657                "Third field should be 'value' from left table"
5658            );
5659            assert_eq!(
5660                fields[3].name(),
5661                "id",
5662                "Fourth field should be 'id' from right table"
5663            );
5664            assert_eq!(
5665                fields[4].name(),
5666                "category",
5667                "Fifth field should be 'category' from right table"
5668            );
5669            assert_eq!(
5670                fields[5].name(),
5671                "value",
5672                "Sixth field should be 'value' from right table"
5673            );
5674
5675            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5676        }
5677
5678        // Test 3: Join with null equality behavior set to true
5679        {
5680            let join = Join::try_new(
5681                Arc::new(left_plan.clone()),
5682                Arc::new(right_plan.clone()),
5683                vec![(col("t1.id"), col("t2.id"))],
5684                None,
5685                JoinType::Inner,
5686                JoinConstraint::On,
5687                NullEquality::NullEqualsNull,
5688            )?;
5689
5690            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5691        }
5692
5693        Ok(())
5694    }
5695
5696    #[test]
5697    fn test_join_try_new_schema_validation() -> Result<()> {
5698        let left_schema = Schema::new(vec![
5699            Field::new("id", DataType::Int32, false),
5700            Field::new("name", DataType::Utf8, false),
5701            Field::new("value", DataType::Float64, true),
5702        ]);
5703
5704        let right_schema = Schema::new(vec![
5705            Field::new("id", DataType::Int32, false),
5706            Field::new("category", DataType::Utf8, true),
5707            Field::new("code", DataType::Int16, false),
5708        ]);
5709
5710        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5711
5712        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5713
5714        let join_types = vec![
5715            JoinType::Inner,
5716            JoinType::Left,
5717            JoinType::Right,
5718            JoinType::Full,
5719        ];
5720
5721        for join_type in join_types {
5722            let join = Join::try_new(
5723                Arc::new(left_plan.clone()),
5724                Arc::new(right_plan.clone()),
5725                vec![(col("t1.id"), col("t2.id"))],
5726                Some(col("t1.value").gt(lit(5.0))),
5727                join_type,
5728                JoinConstraint::On,
5729                NullEquality::NullEqualsNothing,
5730            )?;
5731
5732            let fields = join.schema.fields();
5733            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5734
5735            for (i, field) in fields.iter().enumerate() {
5736                let expected_nullable = match (i, &join_type) {
5737                    // Left table fields (indices 0, 1, 2)
5738                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5739                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5740                    (2, _) => true, // value is already nullable
5741
5742                    // Right table fields (indices 3, 4, 5)
5743                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5744                    (4, _) => true, // category is already nullable
5745                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5746
5747                    _ => false,
5748                };
5749
5750                assert_eq!(
5751                    field.is_nullable(),
5752                    expected_nullable,
5753                    "Field {} ({}) nullability incorrect for {:?} join",
5754                    i,
5755                    field.name(),
5756                    join_type
5757                );
5758            }
5759        }
5760
5761        let using_join = Join::try_new(
5762            Arc::new(left_plan.clone()),
5763            Arc::new(right_plan.clone()),
5764            vec![(col("t1.id"), col("t2.id"))],
5765            None,
5766            JoinType::Inner,
5767            JoinConstraint::Using,
5768            NullEquality::NullEqualsNothing,
5769        )?;
5770
5771        assert_eq!(
5772            using_join.schema.fields().len(),
5773            6,
5774            "USING join should have all fields"
5775        );
5776        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5777
5778        Ok(())
5779    }
5780}