Skip to main content

datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29    InvariantLevel, assert_always_invariants_at_current_node,
30    assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35    intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38    NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48    BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49    LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50    WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62    FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63    ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207    /// Evaluates an arbitrary list of expressions (essentially a
208    /// SELECT with an expression list) on its input.
209    Projection(Projection),
210    /// Filters rows from its input that do not match an
211    /// expression (essentially a WHERE clause with a predicate
212    /// expression).
213    ///
214    /// Semantically, `<predicate>` is evaluated for each row of the
215    /// input; If the value of `<predicate>` is true, the input row is
216    /// passed to the output. If the value of `<predicate>` is false
217    /// (or null), the row is discarded.
218    Filter(Filter),
219    /// Windows input based on a set of window spec and window
220    /// function (e.g. SUM or RANK).  This is used to implement SQL
221    /// window functions, and the `OVER` clause.
222    ///
223    /// See [`Window`] for more details
224    Window(Window),
225    /// Aggregates its input based on a set of grouping and aggregate
226    /// expressions (e.g. SUM). This is used to implement SQL aggregates
227    /// and `GROUP BY`.
228    ///
229    /// See [`Aggregate`] for more details
230    Aggregate(Aggregate),
231    /// Sorts its input according to a list of sort expressions. This
232    /// is used to implement SQL `ORDER BY`
233    Sort(Sort),
234    /// Join two logical plans on one or more join columns.
235    /// This is used to implement SQL `JOIN`
236    Join(Join),
237    /// Repartitions the input based on a partitioning scheme. This is
238    /// used to add parallelism and is sometimes referred to as an
239    /// "exchange" operator in other systems
240    Repartition(Repartition),
241    /// Union multiple inputs with the same schema into a single
242    /// output stream. This is used to implement SQL `UNION [ALL]` and
243    /// `INTERSECT [ALL]`.
244    Union(Union),
245    /// Produces rows from a [`TableSource`], used to implement SQL
246    /// `FROM` tables or views.
247    TableScan(TableScan),
248    /// Produces no rows: An empty relation with an empty schema that
249    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
250    /// that has no values in the `FROM` clause.
251    EmptyRelation(EmptyRelation),
252    /// Produces the output of running another query.  This is used to
253    /// implement SQL subqueries
254    Subquery(Subquery),
255    /// Aliased relation provides, or changes, the name of a relation.
256    SubqueryAlias(SubqueryAlias),
257    /// Skip some number of rows, and then fetch some number of rows.
258    Limit(Limit),
259    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
260    Statement(Statement),
261    /// Values expression. See
262    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
263    /// documentation for more details. This is used to implement SQL such as
264    /// `VALUES (1, 2), (3, 4)`
265    Values(Values),
266    /// Produces a relation with string representations of
267    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
268    Explain(Explain),
269    /// Runs the input, and prints annotated physical plan as a string
270    /// with execution metric. This is used to implement SQL
271    /// `EXPLAIN ANALYZE`.
272    Analyze(Analyze),
273    /// Extension operator defined outside of DataFusion. This is used
274    /// to extend DataFusion with custom relational operations that
275    Extension(Extension),
276    /// Remove duplicate rows from the input. This is used to
277    /// implement SQL `SELECT DISTINCT ...`.
278    Distinct(Distinct),
279    /// Data Manipulation Language (DML): Insert / Update / Delete
280    Dml(DmlStatement),
281    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
282    Ddl(DdlStatement),
283    /// `COPY TO` for writing plan results to files
284    Copy(CopyTo),
285    /// Describe the schema of the table. This is used to implement the
286    /// SQL `DESCRIBE` command from MySQL.
287    DescribeTable(DescribeTable),
288    /// Unnest a column that contains a nested list type such as an
289    /// ARRAY. This is used to implement SQL `UNNEST`
290    Unnest(Unnest),
291    /// A variadic query (e.g. "Recursive CTEs")
292    RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296    fn default() -> Self {
297        LogicalPlan::EmptyRelation(EmptyRelation {
298            produce_one_row: false,
299            schema: Arc::new(DFSchema::empty()),
300        })
301    }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306        &'a self,
307        mut f: F,
308    ) -> Result<TreeNodeRecursion> {
309        f(self)
310    }
311
312    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313        self,
314        mut f: F,
315    ) -> Result<Transformed<Self>> {
316        f(self)
317    }
318}
319
320impl LogicalPlan {
321    /// Get a reference to the logical plan's schema
322    pub fn schema(&self) -> &DFSchemaRef {
323        match self {
324            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325            LogicalPlan::Values(Values { schema, .. }) => schema,
326            LogicalPlan::TableScan(TableScan {
327                projected_schema, ..
328            }) => projected_schema,
329            LogicalPlan::Projection(Projection { schema, .. }) => schema,
330            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333            LogicalPlan::Window(Window { schema, .. }) => schema,
334            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336            LogicalPlan::Join(Join { schema, .. }) => schema,
337            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339            LogicalPlan::Statement(statement) => statement.schema(),
340            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342            LogicalPlan::Explain(explain) => &explain.schema,
343            LogicalPlan::Analyze(analyze) => &analyze.schema,
344            LogicalPlan::Extension(extension) => extension.node.schema(),
345            LogicalPlan::Union(Union { schema, .. }) => schema,
346            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347                output_schema
348            }
349            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351            LogicalPlan::Ddl(ddl) => ddl.schema(),
352            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354                // we take the schema of the static term as the schema of the entire recursive query
355                static_term.schema()
356            }
357        }
358    }
359
360    /// Used for normalizing columns, as the fallback schemas to the main schema
361    /// of the plan.
362    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363        match self {
364            LogicalPlan::Window(_)
365            | LogicalPlan::Projection(_)
366            | LogicalPlan::Aggregate(_)
367            | LogicalPlan::Unnest(_)
368            | LogicalPlan::Join(_) => self
369                .inputs()
370                .iter()
371                .map(|input| input.schema().as_ref())
372                .collect(),
373            _ => vec![],
374        }
375    }
376
377    /// Returns the (fixed) output schema for explain plans
378    pub fn explain_schema() -> SchemaRef {
379        SchemaRef::new(Schema::new(vec![
380            Field::new("plan_type", DataType::Utf8, false),
381            Field::new("plan", DataType::Utf8, false),
382        ]))
383    }
384
385    /// Returns the (fixed) output schema for `DESCRIBE` plans
386    pub fn describe_schema() -> Schema {
387        Schema::new(vec![
388            Field::new("column_name", DataType::Utf8, false),
389            Field::new("data_type", DataType::Utf8, false),
390            Field::new("is_nullable", DataType::Utf8, false),
391        ])
392    }
393
394    /// Returns all expressions (non-recursively) evaluated by the current
395    /// logical plan node. This does not include expressions in any children.
396    ///
397    /// Note this method `clone`s all the expressions. When possible, the
398    /// [`tree_node`] API should be used instead of this API.
399    ///
400    /// The returned expressions do not necessarily represent or even
401    /// contributed to the output schema of this node. For example,
402    /// `LogicalPlan::Filter` returns the filter expression even though the
403    /// output of a Filter has the same columns as the input.
404    ///
405    /// The expressions do contain all the columns that are used by this plan,
406    /// so if there are columns not referenced by these expressions then
407    /// DataFusion's optimizer attempts to optimize them away.
408    ///
409    /// [`tree_node`]: crate::logical_plan::tree_node
410    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411        let mut exprs = vec![];
412        self.apply_expressions(|e| {
413            exprs.push(e.clone());
414            Ok(TreeNodeRecursion::Continue)
415        })
416        // closure always returns OK
417        .unwrap();
418        exprs
419    }
420
421    /// Returns all the out reference(correlated) expressions (recursively) in the current
422    /// logical plan nodes and all its descendant nodes.
423    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424        let mut exprs = vec![];
425        self.apply_expressions(|e| {
426            find_out_reference_exprs(e).into_iter().for_each(|e| {
427                if !exprs.contains(&e) {
428                    exprs.push(e)
429                }
430            });
431            Ok(TreeNodeRecursion::Continue)
432        })
433        // closure always returns OK
434        .unwrap();
435        self.inputs()
436            .into_iter()
437            .flat_map(|child| child.all_out_ref_exprs())
438            .for_each(|e| {
439                if !exprs.contains(&e) {
440                    exprs.push(e)
441                }
442            });
443        exprs
444    }
445
446    /// Returns all inputs / children of this `LogicalPlan` node.
447    ///
448    /// Note does not include inputs to inputs, or subqueries.
449    pub fn inputs(&self) -> Vec<&LogicalPlan> {
450        match self {
451            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454            LogicalPlan::Window(Window { input, .. }) => vec![input],
455            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461            LogicalPlan::Extension(extension) => extension.node.inputs(),
462            LogicalPlan::Union(Union { inputs, .. }) => {
463                inputs.iter().map(|arc| arc.as_ref()).collect()
464            }
465            LogicalPlan::Distinct(
466                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467            ) => vec![input],
468            LogicalPlan::Explain(explain) => vec![&explain.plan],
469            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470            LogicalPlan::Dml(write) => vec![&write.input],
471            LogicalPlan::Copy(copy) => vec![&copy.input],
472            LogicalPlan::Ddl(ddl) => ddl.inputs(),
473            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474            LogicalPlan::RecursiveQuery(RecursiveQuery {
475                static_term,
476                recursive_term,
477                ..
478            }) => vec![static_term, recursive_term],
479            LogicalPlan::Statement(stmt) => stmt.inputs(),
480            // plans without inputs
481            LogicalPlan::TableScan { .. }
482            | LogicalPlan::EmptyRelation { .. }
483            | LogicalPlan::Values { .. }
484            | LogicalPlan::DescribeTable(_) => vec![],
485        }
486    }
487
488    /// returns all `Using` join columns in a logical plan
489    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490        let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492        self.apply_with_subqueries(|plan| {
493            if let LogicalPlan::Join(Join {
494                join_constraint: JoinConstraint::Using,
495                on,
496                ..
497            }) = plan
498            {
499                // The join keys in using-join must be columns.
500                let columns =
501                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502                        let Some(l) = l.get_as_join_column() else {
503                            return internal_err!(
504                                "Invalid join key. Expected column, found {l:?}"
505                            );
506                        };
507                        let Some(r) = r.get_as_join_column() else {
508                            return internal_err!(
509                                "Invalid join key. Expected column, found {r:?}"
510                            );
511                        };
512                        accumu.insert(l.to_owned());
513                        accumu.insert(r.to_owned());
514                        Result::<_, DataFusionError>::Ok(accumu)
515                    })?;
516                using_columns.push(columns);
517            }
518            Ok(TreeNodeRecursion::Continue)
519        })?;
520
521        Ok(using_columns)
522    }
523
524    /// returns the first output expression of this `LogicalPlan` node.
525    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526        match self {
527            LogicalPlan::Projection(projection) => {
528                Ok(Some(projection.expr.as_slice()[0].clone()))
529            }
530            LogicalPlan::Aggregate(agg) => {
531                if agg.group_expr.is_empty() {
532                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533                } else {
534                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
535                }
536            }
537            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538                Ok(Some(select_expr[0].clone()))
539            }
540            LogicalPlan::Filter(Filter { input, .. })
541            | LogicalPlan::Distinct(Distinct::All(input))
542            | LogicalPlan::Sort(Sort { input, .. })
543            | LogicalPlan::Limit(Limit { input, .. })
544            | LogicalPlan::Repartition(Repartition { input, .. })
545            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546            LogicalPlan::Join(Join {
547                left,
548                right,
549                join_type,
550                ..
551            }) => match join_type {
552                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553                    if left.schema().fields().is_empty() {
554                        right.head_output_expr()
555                    } else {
556                        left.head_output_expr()
557                    }
558                }
559                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560                    left.head_output_expr()
561                }
562                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563                    right.head_output_expr()
564                }
565            },
566            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567                static_term.head_output_expr()
568            }
569            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570                union.schema.qualified_field(0),
571            )))),
572            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573                table.projected_schema.qualified_field(0),
574            )))),
575            LogicalPlan::SubqueryAlias(subquery_alias) => {
576                let expr_opt = subquery_alias.input.head_output_expr()?;
577                expr_opt
578                    .map(|expr| {
579                        Ok(Expr::Column(create_col_from_scalar_expr(
580                            &expr,
581                            subquery_alias.alias.to_string(),
582                        )?))
583                    })
584                    .map_or(Ok(None), |v| v.map(Some))
585            }
586            LogicalPlan::Subquery(_) => Ok(None),
587            LogicalPlan::EmptyRelation(_)
588            | LogicalPlan::Statement(_)
589            | LogicalPlan::Values(_)
590            | LogicalPlan::Explain(_)
591            | LogicalPlan::Analyze(_)
592            | LogicalPlan::Extension(_)
593            | LogicalPlan::Dml(_)
594            | LogicalPlan::Copy(_)
595            | LogicalPlan::Ddl(_)
596            | LogicalPlan::DescribeTable(_)
597            | LogicalPlan::Unnest(_) => Ok(None),
598        }
599    }
600
601    /// Recomputes schema and type information for this LogicalPlan if needed.
602    ///
603    /// Some `LogicalPlan`s may need to recompute their schema if the number or
604    /// type of expressions have been changed (for example due to type
605    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
606    /// its expressions.
607    ///
608    /// Some `LogicalPlan`s schema is unaffected by any changes to their
609    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
610    /// same as its input schema.
611    ///
612    /// This is useful after modifying a plans `Expr`s (or input plans) via
613    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
614    /// [Self::with_new_exprs], this method does not require a new set of
615    /// expressions or inputs plans.
616    ///
617    /// # Return value
618    /// Returns an error if there is some issue recomputing the schema.
619    ///
620    /// # Notes
621    ///
622    /// * Does not recursively recompute schema for input (child) plans.
623    pub fn recompute_schema(self) -> Result<Self> {
624        match self {
625            // Since expr may be different than the previous expr, schema of the projection
626            // may change. We need to use try_new method instead of try_new_with_schema method.
627            LogicalPlan::Projection(Projection {
628                expr,
629                input,
630                schema: _,
631            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632            LogicalPlan::Dml(_) => Ok(self),
633            LogicalPlan::Copy(_) => Ok(self),
634            LogicalPlan::Values(Values { schema, values }) => {
635                // todo it isn't clear why the schema is not recomputed here
636                Ok(LogicalPlan::Values(Values { schema, values }))
637            }
638            LogicalPlan::Filter(Filter { predicate, input }) => {
639                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640            }
641            LogicalPlan::Repartition(_) => Ok(self),
642            LogicalPlan::Window(Window {
643                input,
644                window_expr,
645                schema: _,
646            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647            LogicalPlan::Aggregate(Aggregate {
648                input,
649                group_expr,
650                aggr_expr,
651                schema: _,
652            }) => Aggregate::try_new(input, group_expr, aggr_expr)
653                .map(LogicalPlan::Aggregate),
654            LogicalPlan::Sort(_) => Ok(self),
655            LogicalPlan::Join(Join {
656                left,
657                right,
658                filter,
659                join_type,
660                join_constraint,
661                on,
662                schema: _,
663                null_equality,
664                null_aware,
665            }) => {
666                let schema =
667                    build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669                let new_on: Vec<_> = on
670                    .into_iter()
671                    .map(|equi_expr| {
672                        // SimplifyExpression rule may add alias to the equi_expr.
673                        (equi_expr.0.unalias(), equi_expr.1.unalias())
674                    })
675                    .collect();
676
677                Ok(LogicalPlan::Join(Join {
678                    left,
679                    right,
680                    join_type,
681                    join_constraint,
682                    on: new_on,
683                    filter,
684                    schema: DFSchemaRef::new(schema),
685                    null_equality,
686                    null_aware,
687                }))
688            }
689            LogicalPlan::Subquery(_) => Ok(self),
690            LogicalPlan::SubqueryAlias(SubqueryAlias {
691                input,
692                alias,
693                schema: _,
694            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
695            LogicalPlan::Limit(_) => Ok(self),
696            LogicalPlan::Ddl(_) => Ok(self),
697            LogicalPlan::Extension(Extension { node }) => {
698                // todo make an API that does not require cloning
699                // This requires a copy of the extension nodes expressions and inputs
700                let expr = node.expressions();
701                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
702                Ok(LogicalPlan::Extension(Extension {
703                    node: node.with_exprs_and_inputs(expr, inputs)?,
704                }))
705            }
706            LogicalPlan::Union(Union { inputs, schema }) => {
707                let first_input_schema = inputs[0].schema();
708                if schema.fields().len() == first_input_schema.fields().len() {
709                    // If inputs are not pruned do not change schema
710                    Ok(LogicalPlan::Union(Union { inputs, schema }))
711                } else {
712                    // A note on `Union`s constructed via `try_new_by_name`:
713                    //
714                    // At this point, the schema for each input should have
715                    // the same width. Thus, we do not need to save whether a
716                    // `Union` was created `BY NAME`, and can safely rely on the
717                    // `try_new` initializer to derive the new schema based on
718                    // column positions.
719                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
720                }
721            }
722            LogicalPlan::Distinct(distinct) => {
723                let distinct = match distinct {
724                    Distinct::All(input) => Distinct::All(input),
725                    Distinct::On(DistinctOn {
726                        on_expr,
727                        select_expr,
728                        sort_expr,
729                        input,
730                        schema: _,
731                    }) => Distinct::On(DistinctOn::try_new(
732                        on_expr,
733                        select_expr,
734                        sort_expr,
735                        input,
736                    )?),
737                };
738                Ok(LogicalPlan::Distinct(distinct))
739            }
740            LogicalPlan::RecursiveQuery(_) => Ok(self),
741            LogicalPlan::Analyze(_) => Ok(self),
742            LogicalPlan::Explain(_) => Ok(self),
743            LogicalPlan::TableScan(_) => Ok(self),
744            LogicalPlan::EmptyRelation(_) => Ok(self),
745            LogicalPlan::Statement(_) => Ok(self),
746            LogicalPlan::DescribeTable(_) => Ok(self),
747            LogicalPlan::Unnest(Unnest {
748                input,
749                exec_columns,
750                options,
751                ..
752            }) => {
753                // Update schema with unnested column type.
754                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
755            }
756        }
757    }
758
759    /// Returns a new `LogicalPlan` based on `self` with inputs and
760    /// expressions replaced.
761    ///
762    /// Note this method creates an entirely new node, which requires a large
763    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
764    /// instead of this API.
765    ///
766    /// The exprs correspond to the same order of expressions returned
767    /// by [`Self::expressions`]. This function is used by optimizers
768    /// to rewrite plans using the following pattern:
769    ///
770    /// [`tree_node`]: crate::logical_plan::tree_node
771    ///
772    /// ```text
773    /// let new_inputs = optimize_children(..., plan, props);
774    ///
775    /// // get the plans expressions to optimize
776    /// let exprs = plan.expressions();
777    ///
778    /// // potentially rewrite plan expressions
779    /// let rewritten_exprs = rewrite_exprs(exprs);
780    ///
781    /// // create new plan using rewritten_exprs in same position
782    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
783    /// ```
784    pub fn with_new_exprs(
785        &self,
786        mut expr: Vec<Expr>,
787        inputs: Vec<LogicalPlan>,
788    ) -> Result<LogicalPlan> {
789        match self {
790            // Since expr may be different than the previous expr, schema of the projection
791            // may change. We need to use try_new method instead of try_new_with_schema method.
792            LogicalPlan::Projection(Projection { .. }) => {
793                let input = self.only_input(inputs)?;
794                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
795            }
796            LogicalPlan::Dml(DmlStatement {
797                table_name,
798                target,
799                op,
800                ..
801            }) => {
802                self.assert_no_expressions(expr)?;
803                let input = self.only_input(inputs)?;
804                Ok(LogicalPlan::Dml(DmlStatement::new(
805                    table_name.clone(),
806                    Arc::clone(target),
807                    op.clone(),
808                    Arc::new(input),
809                )))
810            }
811            LogicalPlan::Copy(CopyTo {
812                input: _,
813                output_url,
814                file_type,
815                options,
816                partition_by,
817                output_schema: _,
818            }) => {
819                self.assert_no_expressions(expr)?;
820                let input = self.only_input(inputs)?;
821                Ok(LogicalPlan::Copy(CopyTo::new(
822                    Arc::new(input),
823                    output_url.clone(),
824                    partition_by.clone(),
825                    Arc::clone(file_type),
826                    options.clone(),
827                )))
828            }
829            LogicalPlan::Values(Values { schema, .. }) => {
830                self.assert_no_inputs(inputs)?;
831                Ok(LogicalPlan::Values(Values {
832                    schema: Arc::clone(schema),
833                    values: expr
834                        .chunks_exact(schema.fields().len())
835                        .map(|s| s.to_vec())
836                        .collect(),
837                }))
838            }
839            LogicalPlan::Filter { .. } => {
840                let predicate = self.only_expr(expr)?;
841                let input = self.only_input(inputs)?;
842
843                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
844            }
845            LogicalPlan::Repartition(Repartition {
846                partitioning_scheme,
847                ..
848            }) => match partitioning_scheme {
849                Partitioning::RoundRobinBatch(n) => {
850                    self.assert_no_expressions(expr)?;
851                    let input = self.only_input(inputs)?;
852                    Ok(LogicalPlan::Repartition(Repartition {
853                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
854                        input: Arc::new(input),
855                    }))
856                }
857                Partitioning::Hash(_, n) => {
858                    let input = self.only_input(inputs)?;
859                    Ok(LogicalPlan::Repartition(Repartition {
860                        partitioning_scheme: Partitioning::Hash(expr, *n),
861                        input: Arc::new(input),
862                    }))
863                }
864                Partitioning::DistributeBy(_) => {
865                    let input = self.only_input(inputs)?;
866                    Ok(LogicalPlan::Repartition(Repartition {
867                        partitioning_scheme: Partitioning::DistributeBy(expr),
868                        input: Arc::new(input),
869                    }))
870                }
871            },
872            LogicalPlan::Window(Window { window_expr, .. }) => {
873                assert_eq!(window_expr.len(), expr.len());
874                let input = self.only_input(inputs)?;
875                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
876            }
877            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
878                let input = self.only_input(inputs)?;
879                // group exprs are the first expressions
880                let agg_expr = expr.split_off(group_expr.len());
881
882                Aggregate::try_new(Arc::new(input), expr, agg_expr)
883                    .map(LogicalPlan::Aggregate)
884            }
885            LogicalPlan::Sort(Sort {
886                expr: sort_expr,
887                fetch,
888                ..
889            }) => {
890                let input = self.only_input(inputs)?;
891                Ok(LogicalPlan::Sort(Sort {
892                    expr: expr
893                        .into_iter()
894                        .zip(sort_expr.iter())
895                        .map(|(expr, sort)| sort.with_expr(expr))
896                        .collect(),
897                    input: Arc::new(input),
898                    fetch: *fetch,
899                }))
900            }
901            LogicalPlan::Join(Join {
902                join_type,
903                join_constraint,
904                on,
905                null_equality,
906                null_aware,
907                ..
908            }) => {
909                let (left, right) = self.only_two_inputs(inputs)?;
910                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
911
912                let equi_expr_count = on.len() * 2;
913                assert!(expr.len() >= equi_expr_count);
914
915                // Assume that the last expr, if any,
916                // is the filter_expr (non equality predicate from ON clause)
917                let filter_expr = if expr.len() > equi_expr_count {
918                    expr.pop()
919                } else {
920                    None
921                };
922
923                // The first part of expr is equi-exprs,
924                // and the struct of each equi-expr is like `left-expr = right-expr`.
925                assert_eq!(expr.len(), equi_expr_count);
926                let mut new_on = Vec::with_capacity(on.len());
927                let mut iter = expr.into_iter();
928                while let Some(left) = iter.next() {
929                    let Some(right) = iter.next() else {
930                        internal_err!(
931                            "Expected a pair of expressions to construct the join on expression"
932                        )?
933                    };
934
935                    // SimplifyExpression rule may add alias to the equi_expr.
936                    new_on.push((left.unalias(), right.unalias()));
937                }
938
939                Ok(LogicalPlan::Join(Join {
940                    left: Arc::new(left),
941                    right: Arc::new(right),
942                    join_type: *join_type,
943                    join_constraint: *join_constraint,
944                    on: new_on,
945                    filter: filter_expr,
946                    schema: DFSchemaRef::new(schema),
947                    null_equality: *null_equality,
948                    null_aware: *null_aware,
949                }))
950            }
951            LogicalPlan::Subquery(Subquery {
952                outer_ref_columns,
953                spans,
954                ..
955            }) => {
956                self.assert_no_expressions(expr)?;
957                let input = self.only_input(inputs)?;
958                let subquery = LogicalPlanBuilder::from(input).build()?;
959                Ok(LogicalPlan::Subquery(Subquery {
960                    subquery: Arc::new(subquery),
961                    outer_ref_columns: outer_ref_columns.clone(),
962                    spans: spans.clone(),
963                }))
964            }
965            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
966                self.assert_no_expressions(expr)?;
967                let input = self.only_input(inputs)?;
968                SubqueryAlias::try_new(Arc::new(input), alias.clone())
969                    .map(LogicalPlan::SubqueryAlias)
970            }
971            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
972                let old_expr_len = skip.iter().chain(fetch.iter()).count();
973                assert_eq_or_internal_err!(
974                    old_expr_len,
975                    expr.len(),
976                    "Invalid number of new Limit expressions: expected {}, got {}",
977                    old_expr_len,
978                    expr.len()
979                );
980                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
981                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
982                let new_skip = skip.as_ref().and_then(|_| expr.pop());
983                let input = self.only_input(inputs)?;
984                Ok(LogicalPlan::Limit(Limit {
985                    skip: new_skip.map(Box::new),
986                    fetch: new_fetch.map(Box::new),
987                    input: Arc::new(input),
988                }))
989            }
990            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
991                name,
992                if_not_exists,
993                or_replace,
994                column_defaults,
995                temporary,
996                ..
997            })) => {
998                self.assert_no_expressions(expr)?;
999                let input = self.only_input(inputs)?;
1000                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1001                    CreateMemoryTable {
1002                        input: Arc::new(input),
1003                        constraints: Constraints::default(),
1004                        name: name.clone(),
1005                        if_not_exists: *if_not_exists,
1006                        or_replace: *or_replace,
1007                        column_defaults: column_defaults.clone(),
1008                        temporary: *temporary,
1009                    },
1010                )))
1011            }
1012            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1013                name,
1014                or_replace,
1015                definition,
1016                temporary,
1017                ..
1018            })) => {
1019                self.assert_no_expressions(expr)?;
1020                let input = self.only_input(inputs)?;
1021                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1022                    input: Arc::new(input),
1023                    name: name.clone(),
1024                    or_replace: *or_replace,
1025                    temporary: *temporary,
1026                    definition: definition.clone(),
1027                })))
1028            }
1029            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1030                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1031            })),
1032            LogicalPlan::Union(Union { schema, .. }) => {
1033                self.assert_no_expressions(expr)?;
1034                let input_schema = inputs[0].schema();
1035                // If inputs are not pruned do not change schema.
1036                let schema = if schema.fields().len() == input_schema.fields().len() {
1037                    Arc::clone(schema)
1038                } else {
1039                    Arc::clone(input_schema)
1040                };
1041                Ok(LogicalPlan::Union(Union {
1042                    inputs: inputs.into_iter().map(Arc::new).collect(),
1043                    schema,
1044                }))
1045            }
1046            LogicalPlan::Distinct(distinct) => {
1047                let distinct = match distinct {
1048                    Distinct::All(_) => {
1049                        self.assert_no_expressions(expr)?;
1050                        let input = self.only_input(inputs)?;
1051                        Distinct::All(Arc::new(input))
1052                    }
1053                    Distinct::On(DistinctOn {
1054                        on_expr,
1055                        select_expr,
1056                        ..
1057                    }) => {
1058                        let input = self.only_input(inputs)?;
1059                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1060                        let select_expr = expr.split_off(on_expr.len());
1061                        assert!(
1062                            sort_expr.is_empty(),
1063                            "with_new_exprs for Distinct does not support sort expressions"
1064                        );
1065                        Distinct::On(DistinctOn::try_new(
1066                            expr,
1067                            select_expr,
1068                            None, // no sort expressions accepted
1069                            Arc::new(input),
1070                        )?)
1071                    }
1072                };
1073                Ok(LogicalPlan::Distinct(distinct))
1074            }
1075            LogicalPlan::RecursiveQuery(RecursiveQuery {
1076                name, is_distinct, ..
1077            }) => {
1078                self.assert_no_expressions(expr)?;
1079                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1080                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1081                    name: name.clone(),
1082                    static_term: Arc::new(static_term),
1083                    recursive_term: Arc::new(recursive_term),
1084                    is_distinct: *is_distinct,
1085                }))
1086            }
1087            LogicalPlan::Analyze(a) => {
1088                self.assert_no_expressions(expr)?;
1089                let input = self.only_input(inputs)?;
1090                Ok(LogicalPlan::Analyze(Analyze {
1091                    verbose: a.verbose,
1092                    schema: Arc::clone(&a.schema),
1093                    input: Arc::new(input),
1094                }))
1095            }
1096            LogicalPlan::Explain(e) => {
1097                self.assert_no_expressions(expr)?;
1098                let input = self.only_input(inputs)?;
1099                Ok(LogicalPlan::Explain(Explain {
1100                    verbose: e.verbose,
1101                    plan: Arc::new(input),
1102                    explain_format: e.explain_format.clone(),
1103                    stringified_plans: e.stringified_plans.clone(),
1104                    schema: Arc::clone(&e.schema),
1105                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1106                }))
1107            }
1108            LogicalPlan::Statement(Statement::Prepare(Prepare {
1109                name, fields, ..
1110            })) => {
1111                self.assert_no_expressions(expr)?;
1112                let input = self.only_input(inputs)?;
1113                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1114                    name: name.clone(),
1115                    fields: fields.clone(),
1116                    input: Arc::new(input),
1117                })))
1118            }
1119            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1120                self.assert_no_inputs(inputs)?;
1121                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1122                    name: name.clone(),
1123                    parameters: expr,
1124                })))
1125            }
1126            LogicalPlan::TableScan(ts) => {
1127                self.assert_no_inputs(inputs)?;
1128                Ok(LogicalPlan::TableScan(TableScan {
1129                    filters: expr,
1130                    ..ts.clone()
1131                }))
1132            }
1133            LogicalPlan::EmptyRelation(_)
1134            | LogicalPlan::Ddl(_)
1135            | LogicalPlan::Statement(_)
1136            | LogicalPlan::DescribeTable(_) => {
1137                // All of these plan types have no inputs / exprs so should not be called
1138                self.assert_no_expressions(expr)?;
1139                self.assert_no_inputs(inputs)?;
1140                Ok(self.clone())
1141            }
1142            LogicalPlan::Unnest(Unnest {
1143                exec_columns: columns,
1144                options,
1145                ..
1146            }) => {
1147                self.assert_no_expressions(expr)?;
1148                let input = self.only_input(inputs)?;
1149                // Update schema with unnested column type.
1150                let new_plan =
1151                    unnest_with_options(input, columns.clone(), options.clone())?;
1152                Ok(new_plan)
1153            }
1154        }
1155    }
1156
1157    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1158    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1159        match check {
1160            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1161            InvariantLevel::Executable => assert_executable_invariants(self),
1162        }
1163    }
1164
1165    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1166    #[inline]
1167    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1168    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1169        assert_or_internal_err!(
1170            expr.is_empty(),
1171            "{self:?} should have no exprs, got {:?}",
1172            expr
1173        );
1174        Ok(())
1175    }
1176
1177    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1178    #[inline]
1179    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1180    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1181        assert_or_internal_err!(
1182            inputs.is_empty(),
1183            "{self:?} should have no inputs, got: {:?}",
1184            inputs
1185        );
1186        Ok(())
1187    }
1188
1189    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1190    #[inline]
1191    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1192        assert_eq_or_internal_err!(
1193            expr.len(),
1194            1,
1195            "{self:?} should have exactly one expr, got {:?}",
1196            &expr
1197        );
1198        Ok(expr.remove(0))
1199    }
1200
1201    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1202    #[inline]
1203    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1204        assert_eq_or_internal_err!(
1205            inputs.len(),
1206            1,
1207            "{self:?} should have exactly one input, got {:?}",
1208            &inputs
1209        );
1210        Ok(inputs.remove(0))
1211    }
1212
1213    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1214    #[inline]
1215    fn only_two_inputs(
1216        &self,
1217        mut inputs: Vec<LogicalPlan>,
1218    ) -> Result<(LogicalPlan, LogicalPlan)> {
1219        assert_eq_or_internal_err!(
1220            inputs.len(),
1221            2,
1222            "{self:?} should have exactly two inputs, got {:?}",
1223            &inputs
1224        );
1225        let right = inputs.remove(1);
1226        let left = inputs.remove(0);
1227        Ok((left, right))
1228    }
1229
1230    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1231    /// with the specified `param_values`.
1232    ///
1233    /// [`Prepare`] statements are converted to
1234    /// their inner logical plan for execution.
1235    ///
1236    /// # Example
1237    /// ```
1238    /// # use arrow::datatypes::{Field, Schema, DataType};
1239    /// use datafusion_common::ScalarValue;
1240    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1241    /// # let schema = Schema::new(vec![
1242    /// #     Field::new("id", DataType::Int32, false),
1243    /// # ]);
1244    /// // Build SELECT * FROM t1 WHERE id = $1
1245    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1246    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1247    ///     .build().unwrap();
1248    ///
1249    /// assert_eq!(
1250    ///   "Filter: t1.id = $1\
1251    ///   \n  TableScan: t1",
1252    ///   plan.display_indent().to_string()
1253    /// );
1254    ///
1255    /// // Fill in the parameter $1 with a literal 3
1256    /// let plan = plan.with_param_values(vec![
1257    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1258    /// ]).unwrap();
1259    ///
1260    /// assert_eq!(
1261    ///    "Filter: t1.id = Int32(3)\
1262    ///    \n  TableScan: t1",
1263    ///    plan.display_indent().to_string()
1264    ///  );
1265    ///
1266    /// // Note you can also used named parameters
1267    /// // Build SELECT * FROM t1 WHERE id = $my_param
1268    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1269    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1270    ///     .build().unwrap()
1271    ///     // Fill in the parameter $my_param with a literal 3
1272    ///     .with_param_values(vec![
1273    ///       ("my_param", ScalarValue::from(3i32)),
1274    ///     ]).unwrap();
1275    ///
1276    /// assert_eq!(
1277    ///    "Filter: t1.id = Int32(3)\
1278    ///    \n  TableScan: t1",
1279    ///    plan.display_indent().to_string()
1280    ///  );
1281    /// ```
1282    pub fn with_param_values(
1283        self,
1284        param_values: impl Into<ParamValues>,
1285    ) -> Result<LogicalPlan> {
1286        let param_values = param_values.into();
1287        let plan_with_values = self.replace_params_with_values(&param_values)?;
1288
1289        // unwrap Prepare
1290        Ok(
1291            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1292                plan_with_values
1293            {
1294                param_values.verify_fields(&prepare_lp.fields)?;
1295                // try and take ownership of the input if is not shared, clone otherwise
1296                Arc::unwrap_or_clone(prepare_lp.input)
1297            } else {
1298                plan_with_values
1299            },
1300        )
1301    }
1302
1303    /// Returns the maximum number of rows that this plan can output, if known.
1304    ///
1305    /// If `None`, the plan can return any number of rows.
1306    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1307    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1308        match self {
1309            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1310            LogicalPlan::Filter(filter) => {
1311                if filter.is_scalar() {
1312                    Some(1)
1313                } else {
1314                    filter.input.max_rows()
1315                }
1316            }
1317            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1318            LogicalPlan::Aggregate(Aggregate {
1319                input, group_expr, ..
1320            }) => {
1321                // Empty group_expr will return Some(1)
1322                if group_expr
1323                    .iter()
1324                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1325                {
1326                    Some(1)
1327                } else {
1328                    input.max_rows()
1329                }
1330            }
1331            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1332                match (fetch, input.max_rows()) {
1333                    (Some(fetch_limit), Some(input_max)) => {
1334                        Some(input_max.min(*fetch_limit))
1335                    }
1336                    (Some(fetch_limit), None) => Some(*fetch_limit),
1337                    (None, Some(input_max)) => Some(input_max),
1338                    (None, None) => None,
1339                }
1340            }
1341            LogicalPlan::Join(Join {
1342                left,
1343                right,
1344                join_type,
1345                ..
1346            }) => match join_type {
1347                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1348                JoinType::Left | JoinType::Right | JoinType::Full => {
1349                    match (left.max_rows()?, right.max_rows()?, join_type) {
1350                        (0, 0, _) => Some(0),
1351                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1352                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1353                        (left_max, right_max, _) => Some(left_max * right_max),
1354                    }
1355                }
1356                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1357                    left.max_rows()
1358                }
1359                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1360                    right.max_rows()
1361                }
1362            },
1363            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1364            LogicalPlan::Union(Union { inputs, .. }) => {
1365                inputs.iter().try_fold(0usize, |mut acc, plan| {
1366                    acc += plan.max_rows()?;
1367                    Some(acc)
1368                })
1369            }
1370            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1371            LogicalPlan::EmptyRelation(_) => Some(0),
1372            LogicalPlan::RecursiveQuery(_) => None,
1373            LogicalPlan::Subquery(_) => None,
1374            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1375            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1376                Ok(FetchType::Literal(s)) => s,
1377                _ => None,
1378            },
1379            LogicalPlan::Distinct(
1380                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1381            ) => input.max_rows(),
1382            LogicalPlan::Values(v) => Some(v.values.len()),
1383            LogicalPlan::Unnest(_) => None,
1384            LogicalPlan::Ddl(_)
1385            | LogicalPlan::Explain(_)
1386            | LogicalPlan::Analyze(_)
1387            | LogicalPlan::Dml(_)
1388            | LogicalPlan::Copy(_)
1389            | LogicalPlan::DescribeTable(_)
1390            | LogicalPlan::Statement(_)
1391            | LogicalPlan::Extension(_) => None,
1392        }
1393    }
1394
1395    /// If this node's expressions contains any references to an outer subquery
1396    pub fn contains_outer_reference(&self) -> bool {
1397        let mut contains = false;
1398        self.apply_expressions(|expr| {
1399            Ok(if expr.contains_outer() {
1400                contains = true;
1401                TreeNodeRecursion::Stop
1402            } else {
1403                TreeNodeRecursion::Continue
1404            })
1405        })
1406        .unwrap();
1407        contains
1408    }
1409
1410    /// Get the output expressions and their corresponding columns.
1411    ///
1412    /// The parent node may reference the output columns of the plan by expressions, such as
1413    /// projection over aggregate or window functions. This method helps to convert the
1414    /// referenced expressions into columns.
1415    ///
1416    /// See also: [`crate::utils::columnize_expr`]
1417    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1418        match self {
1419            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1420                .output_expressions()?
1421                .into_iter()
1422                .zip(self.schema().columns())
1423                .collect()),
1424            LogicalPlan::Window(Window {
1425                window_expr,
1426                input,
1427                schema,
1428            }) => {
1429                // The input could be another Window, so the result should also include the input's. For Example:
1430                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1431                // Its plan is:
1432                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1433                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1434                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1435                //       TableScan: t projection=[a, b]
1436                let mut output_exprs = input.columnized_output_exprs()?;
1437                let input_len = input.schema().fields().len();
1438                output_exprs.extend(
1439                    window_expr
1440                        .iter()
1441                        .zip(schema.columns().into_iter().skip(input_len)),
1442                );
1443                Ok(output_exprs)
1444            }
1445            _ => Ok(vec![]),
1446        }
1447    }
1448}
1449
1450impl LogicalPlan {
1451    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1452    /// ...) replaced with corresponding values provided in
1453    /// `params_values`
1454    ///
1455    /// See [`Self::with_param_values`] for examples and usage with an owned
1456    /// `ParamValues`
1457    pub fn replace_params_with_values(
1458        self,
1459        param_values: &ParamValues,
1460    ) -> Result<LogicalPlan> {
1461        self.transform_up_with_subqueries(|plan| {
1462            let schema = Arc::clone(plan.schema());
1463            let name_preserver = NamePreserver::new(&plan);
1464            plan.map_expressions(|e| {
1465                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1466                if !has_placeholder {
1467                    // Performance optimization:
1468                    // avoid NamePreserver copy and second pass over expression
1469                    // if no placeholders.
1470                    Ok(Transformed::no(e))
1471                } else {
1472                    let original_name = name_preserver.save(&e);
1473                    let transformed_expr = e.transform_up(|e| {
1474                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1475                            let (value, metadata) = param_values
1476                                .get_placeholders_with_values(&id)?
1477                                .into_inner();
1478                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1479                        } else {
1480                            Ok(Transformed::no(e))
1481                        }
1482                    })?;
1483                    // Preserve name to avoid breaking column references to this expression
1484                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1485                }
1486            })?
1487            .map_data(|plan| plan.update_schema_data_type())
1488        })
1489        .map(|res| res.data)
1490    }
1491
1492    /// Recompute schema fields' data type after replacing params, ensuring fields data type can be
1493    /// updated according to the new parameters.
1494    ///
1495    /// Unlike `recompute_schema()`, this method rebuilds VALUES plans entirely to properly infer
1496    /// types types from literal values after placeholder substitution.
1497    fn update_schema_data_type(self) -> Result<LogicalPlan> {
1498        match self {
1499            // Build `LogicalPlan::Values` from the values for type inference.
1500            // We can't use `recompute_schema` because it skips recomputing for
1501            // `LogicalPlan::Values`.
1502            LogicalPlan::Values(Values { values, schema: _ }) => {
1503                LogicalPlanBuilder::values(values)?.build()
1504            }
1505            // other plans can just use `recompute_schema` directly.
1506            plan => plan.recompute_schema(),
1507        }
1508    }
1509
1510    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1511    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1512        let mut param_names = HashSet::new();
1513        self.apply_with_subqueries(|plan| {
1514            plan.apply_expressions(|expr| {
1515                expr.apply(|expr| {
1516                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1517                        param_names.insert(id.clone());
1518                    }
1519                    Ok(TreeNodeRecursion::Continue)
1520                })
1521            })
1522        })
1523        .map(|_| param_names)
1524    }
1525
1526    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1527    ///
1528    /// Note that this will drop any extension or field metadata attached to parameters. Use
1529    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1530    pub fn get_parameter_types(
1531        &self,
1532    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1533        let mut parameter_fields = self.get_parameter_fields()?;
1534        Ok(parameter_fields
1535            .drain()
1536            .map(|(name, maybe_field)| {
1537                (name, maybe_field.map(|field| field.data_type().clone()))
1538            })
1539            .collect())
1540    }
1541
1542    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1543    pub fn get_parameter_fields(
1544        &self,
1545    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1546        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1547
1548        self.apply_with_subqueries(|plan| {
1549            plan.apply_expressions(|expr| {
1550                expr.apply(|expr| {
1551                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1552                        let prev = param_types.get(id);
1553                        match (prev, field) {
1554                            (Some(Some(prev)), Some(field)) => {
1555                                check_metadata_with_storage_equal(
1556                                    (field.data_type(), Some(field.metadata())),
1557                                    (prev.data_type(), Some(prev.metadata())),
1558                                    "parameter",
1559                                    &format!(": Conflicting types for id {id}"),
1560                                )?;
1561                            }
1562                            (_, Some(field)) => {
1563                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1564                            }
1565                            _ => {
1566                                param_types.insert(id.clone(), None);
1567                            }
1568                        }
1569                    }
1570                    Ok(TreeNodeRecursion::Continue)
1571                })
1572            })
1573        })
1574        .map(|_| param_types)
1575    }
1576
1577    // ------------
1578    // Various implementations for printing out LogicalPlans
1579    // ------------
1580
1581    /// Return a `format`able structure that produces a single line
1582    /// per node.
1583    ///
1584    /// # Example
1585    ///
1586    /// ```text
1587    /// Projection: employee.id
1588    ///    Filter: employee.state Eq Utf8(\"CO\")\
1589    ///       CsvScan: employee projection=Some([0, 3])
1590    /// ```
1591    ///
1592    /// ```
1593    /// use arrow::datatypes::{DataType, Field, Schema};
1594    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1595    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1596    /// let plan = table_scan(Some("t1"), &schema, None)
1597    ///     .unwrap()
1598    ///     .filter(col("id").eq(lit(5)))
1599    ///     .unwrap()
1600    ///     .build()
1601    ///     .unwrap();
1602    ///
1603    /// // Format using display_indent
1604    /// let display_string = format!("{}", plan.display_indent());
1605    ///
1606    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1607    /// ```
1608    pub fn display_indent(&self) -> impl Display + '_ {
1609        // Boilerplate structure to wrap LogicalPlan with something
1610        // that that can be formatted
1611        struct Wrapper<'a>(&'a LogicalPlan);
1612        impl Display for Wrapper<'_> {
1613            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1614                let with_schema = false;
1615                let mut visitor = IndentVisitor::new(f, with_schema);
1616                match self.0.visit_with_subqueries(&mut visitor) {
1617                    Ok(_) => Ok(()),
1618                    Err(_) => Err(fmt::Error),
1619                }
1620            }
1621        }
1622        Wrapper(self)
1623    }
1624
1625    /// Return a `format`able structure that produces a single line
1626    /// per node that includes the output schema. For example:
1627    ///
1628    /// ```text
1629    /// Projection: employee.id [id:Int32]\
1630    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1631    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1632    /// ```
1633    ///
1634    /// ```
1635    /// use arrow::datatypes::{DataType, Field, Schema};
1636    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1637    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1638    /// let plan = table_scan(Some("t1"), &schema, None)
1639    ///     .unwrap()
1640    ///     .filter(col("id").eq(lit(5)))
1641    ///     .unwrap()
1642    ///     .build()
1643    ///     .unwrap();
1644    ///
1645    /// // Format using display_indent_schema
1646    /// let display_string = format!("{}", plan.display_indent_schema());
1647    ///
1648    /// assert_eq!(
1649    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1650    ///             \n  TableScan: t1 [id:Int32]",
1651    ///     display_string
1652    /// );
1653    /// ```
1654    pub fn display_indent_schema(&self) -> impl Display + '_ {
1655        // Boilerplate structure to wrap LogicalPlan with something
1656        // that that can be formatted
1657        struct Wrapper<'a>(&'a LogicalPlan);
1658        impl Display for Wrapper<'_> {
1659            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1660                let with_schema = true;
1661                let mut visitor = IndentVisitor::new(f, with_schema);
1662                match self.0.visit_with_subqueries(&mut visitor) {
1663                    Ok(_) => Ok(()),
1664                    Err(_) => Err(fmt::Error),
1665                }
1666            }
1667        }
1668        Wrapper(self)
1669    }
1670
1671    /// Return a displayable structure that produces plan in postgresql JSON format.
1672    ///
1673    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1674    pub fn display_pg_json(&self) -> impl Display + '_ {
1675        // Boilerplate structure to wrap LogicalPlan with something
1676        // that that can be formatted
1677        struct Wrapper<'a>(&'a LogicalPlan);
1678        impl Display for Wrapper<'_> {
1679            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1680                let mut visitor = PgJsonVisitor::new(f);
1681                visitor.with_schema(true);
1682                match self.0.visit_with_subqueries(&mut visitor) {
1683                    Ok(_) => Ok(()),
1684                    Err(_) => Err(fmt::Error),
1685                }
1686            }
1687        }
1688        Wrapper(self)
1689    }
1690
1691    /// Return a `format`able structure that produces lines meant for
1692    /// graphical display using the `DOT` language. This format can be
1693    /// visualized using software from
1694    /// [`graphviz`](https://graphviz.org/)
1695    ///
1696    /// This currently produces two graphs -- one with the basic
1697    /// structure, and one with additional details such as schema.
1698    ///
1699    /// ```
1700    /// use arrow::datatypes::{DataType, Field, Schema};
1701    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1702    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1703    /// let plan = table_scan(Some("t1"), &schema, None)
1704    ///     .unwrap()
1705    ///     .filter(col("id").eq(lit(5)))
1706    ///     .unwrap()
1707    ///     .build()
1708    ///     .unwrap();
1709    ///
1710    /// // Format using display_graphviz
1711    /// let graphviz_string = format!("{}", plan.display_graphviz());
1712    /// ```
1713    ///
1714    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1715    /// commands can be used to render it as a pdf:
1716    ///
1717    /// ```bash
1718    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1719    /// ```
1720    pub fn display_graphviz(&self) -> impl Display + '_ {
1721        // Boilerplate structure to wrap LogicalPlan with something
1722        // that that can be formatted
1723        struct Wrapper<'a>(&'a LogicalPlan);
1724        impl Display for Wrapper<'_> {
1725            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1726                let mut visitor = GraphvizVisitor::new(f);
1727
1728                visitor.start_graph()?;
1729
1730                visitor.pre_visit_plan("LogicalPlan")?;
1731                self.0
1732                    .visit_with_subqueries(&mut visitor)
1733                    .map_err(|_| fmt::Error)?;
1734                visitor.post_visit_plan()?;
1735
1736                visitor.set_with_schema(true);
1737                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1738                self.0
1739                    .visit_with_subqueries(&mut visitor)
1740                    .map_err(|_| fmt::Error)?;
1741                visitor.post_visit_plan()?;
1742
1743                visitor.end_graph()?;
1744                Ok(())
1745            }
1746        }
1747        Wrapper(self)
1748    }
1749
1750    /// Return a `format`able structure with the a human readable
1751    /// description of this LogicalPlan node per node, not including
1752    /// children. For example:
1753    ///
1754    /// ```text
1755    /// Projection: id
1756    /// ```
1757    /// ```
1758    /// use arrow::datatypes::{DataType, Field, Schema};
1759    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1760    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1761    /// let plan = table_scan(Some("t1"), &schema, None)
1762    ///     .unwrap()
1763    ///     .build()
1764    ///     .unwrap();
1765    ///
1766    /// // Format using display
1767    /// let display_string = format!("{}", plan.display());
1768    ///
1769    /// assert_eq!("TableScan: t1", display_string);
1770    /// ```
1771    pub fn display(&self) -> impl Display + '_ {
1772        // Boilerplate structure to wrap LogicalPlan with something
1773        // that that can be formatted
1774        struct Wrapper<'a>(&'a LogicalPlan);
1775        impl Display for Wrapper<'_> {
1776            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1777                match self.0 {
1778                    LogicalPlan::EmptyRelation(EmptyRelation {
1779                        produce_one_row,
1780                        schema: _,
1781                    }) => {
1782                        let rows = if *produce_one_row { 1 } else { 0 };
1783                        write!(f, "EmptyRelation: rows={rows}")
1784                    }
1785                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1786                        is_distinct, ..
1787                    }) => {
1788                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1789                    }
1790                    LogicalPlan::Values(Values { values, .. }) => {
1791                        let str_values: Vec<_> = values
1792                            .iter()
1793                            // limit to only 5 values to avoid horrible display
1794                            .take(5)
1795                            .map(|row| {
1796                                let item = row
1797                                    .iter()
1798                                    .map(|expr| expr.to_string())
1799                                    .collect::<Vec<_>>()
1800                                    .join(", ");
1801                                format!("({item})")
1802                            })
1803                            .collect();
1804
1805                        let eclipse = if values.len() > 5 { "..." } else { "" };
1806                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1807                    }
1808
1809                    LogicalPlan::TableScan(TableScan {
1810                        source,
1811                        table_name,
1812                        projection,
1813                        filters,
1814                        fetch,
1815                        ..
1816                    }) => {
1817                        let projected_fields = match projection {
1818                            Some(indices) => {
1819                                let schema = source.schema();
1820                                let names: Vec<&str> = indices
1821                                    .iter()
1822                                    .map(|i| schema.field(*i).name().as_str())
1823                                    .collect();
1824                                format!(" projection=[{}]", names.join(", "))
1825                            }
1826                            _ => "".to_string(),
1827                        };
1828
1829                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1830
1831                        if !filters.is_empty() {
1832                            let mut full_filter = vec![];
1833                            let mut partial_filter = vec![];
1834                            let mut unsupported_filters = vec![];
1835                            let filters: Vec<&Expr> = filters.iter().collect();
1836
1837                            if let Ok(results) =
1838                                source.supports_filters_pushdown(&filters)
1839                            {
1840                                filters.iter().zip(results.iter()).for_each(
1841                                    |(x, res)| match res {
1842                                        TableProviderFilterPushDown::Exact => {
1843                                            full_filter.push(x)
1844                                        }
1845                                        TableProviderFilterPushDown::Inexact => {
1846                                            partial_filter.push(x)
1847                                        }
1848                                        TableProviderFilterPushDown::Unsupported => {
1849                                            unsupported_filters.push(x)
1850                                        }
1851                                    },
1852                                );
1853                            }
1854
1855                            if !full_filter.is_empty() {
1856                                write!(
1857                                    f,
1858                                    ", full_filters=[{}]",
1859                                    expr_vec_fmt!(full_filter)
1860                                )?;
1861                            };
1862                            if !partial_filter.is_empty() {
1863                                write!(
1864                                    f,
1865                                    ", partial_filters=[{}]",
1866                                    expr_vec_fmt!(partial_filter)
1867                                )?;
1868                            }
1869                            if !unsupported_filters.is_empty() {
1870                                write!(
1871                                    f,
1872                                    ", unsupported_filters=[{}]",
1873                                    expr_vec_fmt!(unsupported_filters)
1874                                )?;
1875                            }
1876                        }
1877
1878                        if let Some(n) = fetch {
1879                            write!(f, ", fetch={n}")?;
1880                        }
1881
1882                        Ok(())
1883                    }
1884                    LogicalPlan::Projection(Projection { expr, .. }) => {
1885                        write!(f, "Projection:")?;
1886                        for (i, expr_item) in expr.iter().enumerate() {
1887                            if i > 0 {
1888                                write!(f, ",")?;
1889                            }
1890                            write!(f, " {expr_item}")?;
1891                        }
1892                        Ok(())
1893                    }
1894                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1895                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1896                    }
1897                    LogicalPlan::Copy(CopyTo {
1898                        input: _,
1899                        output_url,
1900                        file_type,
1901                        options,
1902                        ..
1903                    }) => {
1904                        let op_str = options
1905                            .iter()
1906                            .map(|(k, v)| format!("{k} {v}"))
1907                            .collect::<Vec<String>>()
1908                            .join(", ");
1909
1910                        write!(
1911                            f,
1912                            "CopyTo: format={} output_url={output_url} options: ({op_str})",
1913                            file_type.get_ext()
1914                        )
1915                    }
1916                    LogicalPlan::Ddl(ddl) => {
1917                        write!(f, "{}", ddl.display())
1918                    }
1919                    LogicalPlan::Filter(Filter {
1920                        predicate: expr, ..
1921                    }) => write!(f, "Filter: {expr}"),
1922                    LogicalPlan::Window(Window { window_expr, .. }) => {
1923                        write!(
1924                            f,
1925                            "WindowAggr: windowExpr=[[{}]]",
1926                            expr_vec_fmt!(window_expr)
1927                        )
1928                    }
1929                    LogicalPlan::Aggregate(Aggregate {
1930                        group_expr,
1931                        aggr_expr,
1932                        ..
1933                    }) => write!(
1934                        f,
1935                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1936                        expr_vec_fmt!(group_expr),
1937                        expr_vec_fmt!(aggr_expr)
1938                    ),
1939                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1940                        write!(f, "Sort: ")?;
1941                        for (i, expr_item) in expr.iter().enumerate() {
1942                            if i > 0 {
1943                                write!(f, ", ")?;
1944                            }
1945                            write!(f, "{expr_item}")?;
1946                        }
1947                        if let Some(a) = fetch {
1948                            write!(f, ", fetch={a}")?;
1949                        }
1950
1951                        Ok(())
1952                    }
1953                    LogicalPlan::Join(Join {
1954                        on: keys,
1955                        filter,
1956                        join_constraint,
1957                        join_type,
1958                        ..
1959                    }) => {
1960                        let join_expr: Vec<String> =
1961                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1962                        let filter_expr = filter
1963                            .as_ref()
1964                            .map(|expr| format!(" Filter: {expr}"))
1965                            .unwrap_or_else(|| "".to_string());
1966                        let join_type = if filter.is_none()
1967                            && keys.is_empty()
1968                            && *join_type == JoinType::Inner
1969                        {
1970                            "Cross".to_string()
1971                        } else {
1972                            join_type.to_string()
1973                        };
1974                        match join_constraint {
1975                            JoinConstraint::On => {
1976                                write!(f, "{join_type} Join:",)?;
1977                                if !join_expr.is_empty() || !filter_expr.is_empty() {
1978                                    write!(
1979                                        f,
1980                                        " {}{}",
1981                                        join_expr.join(", "),
1982                                        filter_expr
1983                                    )?;
1984                                }
1985                                Ok(())
1986                            }
1987                            JoinConstraint::Using => {
1988                                write!(
1989                                    f,
1990                                    "{} Join: Using {}{}",
1991                                    join_type,
1992                                    join_expr.join(", "),
1993                                    filter_expr,
1994                                )
1995                            }
1996                        }
1997                    }
1998                    LogicalPlan::Repartition(Repartition {
1999                        partitioning_scheme,
2000                        ..
2001                    }) => match partitioning_scheme {
2002                        Partitioning::RoundRobinBatch(n) => {
2003                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2004                        }
2005                        Partitioning::Hash(expr, n) => {
2006                            let hash_expr: Vec<String> =
2007                                expr.iter().map(|e| format!("{e}")).collect();
2008                            write!(
2009                                f,
2010                                "Repartition: Hash({}) partition_count={}",
2011                                hash_expr.join(", "),
2012                                n
2013                            )
2014                        }
2015                        Partitioning::DistributeBy(expr) => {
2016                            let dist_by_expr: Vec<String> =
2017                                expr.iter().map(|e| format!("{e}")).collect();
2018                            write!(
2019                                f,
2020                                "Repartition: DistributeBy({})",
2021                                dist_by_expr.join(", "),
2022                            )
2023                        }
2024                    },
2025                    LogicalPlan::Limit(limit) => {
2026                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
2027                        let skip_str = match limit.get_skip_type() {
2028                            Ok(SkipType::Literal(n)) => n.to_string(),
2029                            _ => limit
2030                                .skip
2031                                .as_ref()
2032                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2033                        };
2034                        let fetch_str = match limit.get_fetch_type() {
2035                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
2036                            Ok(FetchType::Literal(None)) => "None".to_string(),
2037                            _ => limit
2038                                .fetch
2039                                .as_ref()
2040                                .map_or_else(|| "None".to_string(), |x| x.to_string()),
2041                        };
2042                        write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2043                    }
2044                    LogicalPlan::Subquery(Subquery { .. }) => {
2045                        write!(f, "Subquery:")
2046                    }
2047                    LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2048                        write!(f, "SubqueryAlias: {alias}")
2049                    }
2050                    LogicalPlan::Statement(statement) => {
2051                        write!(f, "{}", statement.display())
2052                    }
2053                    LogicalPlan::Distinct(distinct) => match distinct {
2054                        Distinct::All(_) => write!(f, "Distinct:"),
2055                        Distinct::On(DistinctOn {
2056                            on_expr,
2057                            select_expr,
2058                            sort_expr,
2059                            ..
2060                        }) => write!(
2061                            f,
2062                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2063                            expr_vec_fmt!(on_expr),
2064                            expr_vec_fmt!(select_expr),
2065                            if let Some(sort_expr) = sort_expr {
2066                                expr_vec_fmt!(sort_expr)
2067                            } else {
2068                                "".to_string()
2069                            },
2070                        ),
2071                    },
2072                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2073                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2074                    LogicalPlan::Union(_) => write!(f, "Union"),
2075                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2076                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2077                        write!(f, "DescribeTable")
2078                    }
2079                    LogicalPlan::Unnest(Unnest {
2080                        input: plan,
2081                        list_type_columns: list_col_indices,
2082                        struct_type_columns: struct_col_indices,
2083                        ..
2084                    }) => {
2085                        let input_columns = plan.schema().columns();
2086                        let list_type_columns = list_col_indices
2087                            .iter()
2088                            .map(|(i, unnest_info)| {
2089                                format!(
2090                                    "{}|depth={}",
2091                                    &input_columns[*i].to_string(),
2092                                    unnest_info.depth
2093                                )
2094                            })
2095                            .collect::<Vec<String>>();
2096                        let struct_type_columns = struct_col_indices
2097                            .iter()
2098                            .map(|i| &input_columns[*i])
2099                            .collect::<Vec<&Column>>();
2100                        // get items from input_columns indexed by list_col_indices
2101                        write!(
2102                            f,
2103                            "Unnest: lists[{}] structs[{}]",
2104                            expr_vec_fmt!(list_type_columns),
2105                            expr_vec_fmt!(struct_type_columns)
2106                        )
2107                    }
2108                }
2109            }
2110        }
2111        Wrapper(self)
2112    }
2113}
2114
2115impl Display for LogicalPlan {
2116    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2117        self.display_indent().fmt(f)
2118    }
2119}
2120
2121impl ToStringifiedPlan for LogicalPlan {
2122    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2123        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2124    }
2125}
2126
2127/// Relationship produces 0 or 1 placeholder rows with specified output schema
2128/// In most cases the output schema for `EmptyRelation` would be empty,
2129/// however, it can be non-empty typically for optimizer rules
2130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2131pub struct EmptyRelation {
2132    /// Whether to produce a placeholder row
2133    pub produce_one_row: bool,
2134    /// The schema description of the output
2135    pub schema: DFSchemaRef,
2136}
2137
2138// Manual implementation needed because of `schema` field. Comparison excludes this field.
2139impl PartialOrd for EmptyRelation {
2140    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141        self.produce_one_row
2142            .partial_cmp(&other.produce_one_row)
2143            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2144            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2145    }
2146}
2147
2148/// A variadic query operation, Recursive CTE.
2149///
2150/// # Recursive Query Evaluation
2151///
2152/// From the [Postgres Docs]:
2153///
2154/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2155///    discard duplicate rows. Include all remaining rows in the result of the
2156///    recursive query, and also place them in a temporary working table.
2157///
2158/// 2. So long as the working table is not empty, repeat these steps:
2159///
2160/// * Evaluate the recursive term, substituting the current contents of the
2161///   working table for the recursive self-reference. For `UNION` (but not `UNION
2162///   ALL`), discard duplicate rows and rows that duplicate any previous result
2163///   row. Include all remaining rows in the result of the recursive query, and
2164///   also place them in a temporary intermediate table.
2165///
2166/// * Replace the contents of the working table with the contents of the
2167///   intermediate table, then empty the intermediate table.
2168///
2169/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2170#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2171pub struct RecursiveQuery {
2172    /// Name of the query
2173    pub name: String,
2174    /// The static term (initial contents of the working table)
2175    pub static_term: Arc<LogicalPlan>,
2176    /// The recursive term (evaluated on the contents of the working table until
2177    /// it returns an empty set)
2178    pub recursive_term: Arc<LogicalPlan>,
2179    /// Should the output of the recursive term be deduplicated (`UNION`) or
2180    /// not (`UNION ALL`).
2181    pub is_distinct: bool,
2182}
2183
2184/// Values expression. See
2185/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2186/// documentation for more details.
2187#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2188pub struct Values {
2189    /// The table schema
2190    pub schema: DFSchemaRef,
2191    /// Values
2192    pub values: Vec<Vec<Expr>>,
2193}
2194
2195// Manual implementation needed because of `schema` field. Comparison excludes this field.
2196impl PartialOrd for Values {
2197    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2198        self.values
2199            .partial_cmp(&other.values)
2200            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2201            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2202    }
2203}
2204
2205/// Evaluates an arbitrary list of expressions (essentially a
2206/// SELECT with an expression list) on its input.
2207#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2208// mark non_exhaustive to encourage use of try_new/new()
2209#[non_exhaustive]
2210pub struct Projection {
2211    /// The list of expressions
2212    pub expr: Vec<Expr>,
2213    /// The incoming logical plan
2214    pub input: Arc<LogicalPlan>,
2215    /// The schema description of the output
2216    pub schema: DFSchemaRef,
2217}
2218
2219// Manual implementation needed because of `schema` field. Comparison excludes this field.
2220impl PartialOrd for Projection {
2221    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2222        match self.expr.partial_cmp(&other.expr) {
2223            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2224            cmp => cmp,
2225        }
2226        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2227        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2228    }
2229}
2230
2231impl Projection {
2232    /// Create a new Projection
2233    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2234        let projection_schema = projection_schema(&input, &expr)?;
2235        Self::try_new_with_schema(expr, input, projection_schema)
2236    }
2237
2238    /// Create a new Projection using the specified output schema
2239    pub fn try_new_with_schema(
2240        expr: Vec<Expr>,
2241        input: Arc<LogicalPlan>,
2242        schema: DFSchemaRef,
2243    ) -> Result<Self> {
2244        #[expect(deprecated)]
2245        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2246            && expr.len() != schema.fields().len()
2247        {
2248            return plan_err!(
2249                "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2250                expr.len(),
2251                schema.fields().len()
2252            );
2253        }
2254        Ok(Self {
2255            expr,
2256            input,
2257            schema,
2258        })
2259    }
2260
2261    /// Create a new Projection using the specified output schema
2262    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2263        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2264        Self {
2265            expr,
2266            input,
2267            schema,
2268        }
2269    }
2270}
2271
2272/// Computes the schema of the result produced by applying a projection to the input logical plan.
2273///
2274/// # Arguments
2275///
2276/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2277///   will be computed.
2278/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2279///
2280/// # Metadata Handling
2281///
2282/// - **Schema-level metadata**: Passed through unchanged from the input schema
2283/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2284///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2285///
2286/// # Returns
2287///
2288/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2289/// produced by the projection operation. If the schema computation is successful,
2290/// the `Result` will contain the schema; otherwise, it will contain an error.
2291pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2292    // Preserve input schema metadata at the schema level
2293    let metadata = input.schema().metadata().clone();
2294
2295    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2296    let schema =
2297        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2298            .with_functional_dependencies(calc_func_dependencies_for_project(
2299                exprs, input,
2300            )?)?;
2301
2302    Ok(Arc::new(schema))
2303}
2304
2305/// Aliased subquery
2306#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2307// mark non_exhaustive to encourage use of try_new/new()
2308#[non_exhaustive]
2309pub struct SubqueryAlias {
2310    /// The incoming logical plan
2311    pub input: Arc<LogicalPlan>,
2312    /// The alias for the input relation
2313    pub alias: TableReference,
2314    /// The schema with qualified field names
2315    pub schema: DFSchemaRef,
2316}
2317
2318impl SubqueryAlias {
2319    pub fn try_new(
2320        plan: Arc<LogicalPlan>,
2321        alias: impl Into<TableReference>,
2322    ) -> Result<Self> {
2323        let alias = alias.into();
2324
2325        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2326        // no field must share the same column name as this would lead to ambiguity when referencing
2327        // columns in parent logical nodes.
2328
2329        // Compute unique aliases, if any, for each column of the input's schema.
2330        let aliases = unique_field_aliases(plan.schema().fields());
2331        let is_projection_needed = aliases.iter().any(Option::is_some);
2332
2333        // Insert a projection node, if needed, to make sure aliases are applied.
2334        let plan = if is_projection_needed {
2335            let projection_expressions = aliases
2336                .iter()
2337                .zip(plan.schema().iter())
2338                .map(|(alias, (qualifier, field))| {
2339                    let column =
2340                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2341                    match alias {
2342                        None => column,
2343                        Some(alias) => {
2344                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2345                        }
2346                    }
2347                })
2348                .collect();
2349            let projection = Projection::try_new(projection_expressions, plan)?;
2350            Arc::new(LogicalPlan::Projection(projection))
2351        } else {
2352            plan
2353        };
2354
2355        // Requalify fields with the new `alias`.
2356        let fields = plan.schema().fields().clone();
2357        let meta_data = plan.schema().metadata().clone();
2358        let func_dependencies = plan.schema().functional_dependencies().clone();
2359
2360        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2361        let schema = schema.as_arrow();
2362
2363        let schema = DFSchemaRef::new(
2364            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2365                .with_functional_dependencies(func_dependencies)?,
2366        );
2367        Ok(SubqueryAlias {
2368            input: plan,
2369            alias,
2370            schema,
2371        })
2372    }
2373}
2374
2375// Manual implementation needed because of `schema` field. Comparison excludes this field.
2376impl PartialOrd for SubqueryAlias {
2377    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2378        match self.input.partial_cmp(&other.input) {
2379            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2380            cmp => cmp,
2381        }
2382        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2383        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2384    }
2385}
2386
2387/// Filters rows from its input that do not match an
2388/// expression (essentially a WHERE clause with a predicate
2389/// expression).
2390///
2391/// Semantically, `<predicate>` is evaluated for each row of the input;
2392/// If the value of `<predicate>` is true, the input row is passed to
2393/// the output. If the value of `<predicate>` is false, the row is
2394/// discarded.
2395///
2396/// Filter should not be created directly but instead use `try_new()`
2397/// and that these fields are only pub to support pattern matching
2398#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2399#[non_exhaustive]
2400pub struct Filter {
2401    /// The predicate expression, which must have Boolean type.
2402    pub predicate: Expr,
2403    /// The incoming logical plan
2404    pub input: Arc<LogicalPlan>,
2405}
2406
2407impl Filter {
2408    /// Create a new filter operator.
2409    ///
2410    /// Notes: as Aliases have no effect on the output of a filter operator,
2411    /// they are removed from the predicate expression.
2412    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2413        Self::try_new_internal(predicate, input)
2414    }
2415
2416    /// Create a new filter operator for a having clause.
2417    /// This is similar to a filter, but its having flag is set to true.
2418    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2419    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2420        Self::try_new_internal(predicate, input)
2421    }
2422
2423    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2424        match data_type {
2425            // Interpret NULL as a missing boolean value.
2426            DataType::Boolean | DataType::Null => true,
2427            DataType::Dictionary(_, value_type) => {
2428                Filter::is_allowed_filter_type(value_type.as_ref())
2429            }
2430            _ => false,
2431        }
2432    }
2433
2434    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2435        // Filter predicates must return a boolean value so we try and validate that here.
2436        // Note that it is not always possible to resolve the predicate expression during plan
2437        // construction (such as with correlated subqueries) so we make a best effort here and
2438        // ignore errors resolving the expression against the schema.
2439        if let Ok(predicate_type) = predicate.get_type(input.schema())
2440            && !Filter::is_allowed_filter_type(&predicate_type)
2441        {
2442            return plan_err!(
2443                "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2444            );
2445        }
2446
2447        Ok(Self {
2448            predicate: predicate.unalias_nested().data,
2449            input,
2450        })
2451    }
2452
2453    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2454    ///
2455    /// This function will return `true` if its predicate contains a conjunction of
2456    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2457    /// by this conjunction.
2458    ///
2459    /// For example, for the table:
2460    /// ```sql
2461    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2462    /// ```
2463    /// `Filter(a = 2).is_scalar() == true`
2464    /// , whereas
2465    /// `Filter(b = 2).is_scalar() == false`
2466    /// and
2467    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2468    fn is_scalar(&self) -> bool {
2469        let schema = self.input.schema();
2470
2471        let functional_dependencies = self.input.schema().functional_dependencies();
2472        let unique_keys = functional_dependencies.iter().filter(|dep| {
2473            let nullable = dep.nullable
2474                && dep
2475                    .source_indices
2476                    .iter()
2477                    .any(|&source| schema.field(source).is_nullable());
2478            !nullable
2479                && dep.mode == Dependency::Single
2480                && dep.target_indices.len() == schema.fields().len()
2481        });
2482
2483        let exprs = split_conjunction(&self.predicate);
2484        let eq_pred_cols: HashSet<_> = exprs
2485            .iter()
2486            .filter_map(|expr| {
2487                let Expr::BinaryExpr(BinaryExpr {
2488                    left,
2489                    op: Operator::Eq,
2490                    right,
2491                }) = expr
2492                else {
2493                    return None;
2494                };
2495                // This is a no-op filter expression
2496                if left == right {
2497                    return None;
2498                }
2499
2500                match (left.as_ref(), right.as_ref()) {
2501                    (Expr::Column(_), Expr::Column(_)) => None,
2502                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2503                        Some(schema.index_of_column(c).unwrap())
2504                    }
2505                    _ => None,
2506                }
2507            })
2508            .collect();
2509
2510        // If we have a functional dependence that is a subset of our predicate,
2511        // this filter is scalar
2512        for key in unique_keys {
2513            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2514                return true;
2515            }
2516        }
2517        false
2518    }
2519}
2520
2521/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2522///
2523/// # Output Schema
2524///
2525/// The output schema is the input schema followed by the window function
2526/// expressions, in order.
2527///
2528/// For example, given the input schema `"A", "B", "C"` and the window function
2529/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2530/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2531/// output column.
2532///
2533/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2534/// schema.
2535#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2536pub struct Window {
2537    /// The incoming logical plan
2538    pub input: Arc<LogicalPlan>,
2539    /// The window function expression
2540    pub window_expr: Vec<Expr>,
2541    /// The schema description of the window output
2542    pub schema: DFSchemaRef,
2543}
2544
2545impl Window {
2546    /// Create a new window operator.
2547    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2548        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2549            .schema()
2550            .iter()
2551            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2552            .collect();
2553        let input_len = fields.len();
2554        let mut window_fields = fields;
2555        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2556        window_fields.extend_from_slice(expr_fields.as_slice());
2557        let metadata = input.schema().metadata().clone();
2558
2559        // Update functional dependencies for window:
2560        let mut window_func_dependencies =
2561            input.schema().functional_dependencies().clone();
2562        window_func_dependencies.extend_target_indices(window_fields.len());
2563
2564        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2565        // of consecutive numbers per partition), we can represent this fact with
2566        // functional dependencies.
2567        let mut new_dependencies = window_expr
2568            .iter()
2569            .enumerate()
2570            .filter_map(|(idx, expr)| {
2571                let Expr::WindowFunction(window_fun) = expr else {
2572                    return None;
2573                };
2574                let WindowFunction {
2575                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2576                    params: WindowFunctionParams { partition_by, .. },
2577                } = window_fun.as_ref()
2578                else {
2579                    return None;
2580                };
2581                // When there is no PARTITION BY, row number will be unique
2582                // across the entire table.
2583                if udwf.name() == "row_number" && partition_by.is_empty() {
2584                    Some(idx + input_len)
2585                } else {
2586                    None
2587                }
2588            })
2589            .map(|idx| {
2590                FunctionalDependence::new(vec![idx], vec![], false)
2591                    .with_mode(Dependency::Single)
2592            })
2593            .collect::<Vec<_>>();
2594
2595        if !new_dependencies.is_empty() {
2596            for dependence in new_dependencies.iter_mut() {
2597                dependence.target_indices = (0..window_fields.len()).collect();
2598            }
2599            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2600            let new_deps = FunctionalDependencies::new(new_dependencies);
2601            window_func_dependencies.extend(new_deps);
2602        }
2603
2604        // Validate that FILTER clauses are only used with aggregate window functions
2605        if let Some(e) = window_expr.iter().find(|e| {
2606            matches!(
2607                e,
2608                Expr::WindowFunction(wf)
2609                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2610                        && wf.params.filter.is_some()
2611            )
2612        }) {
2613            return plan_err!(
2614                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2615            );
2616        }
2617
2618        Self::try_new_with_schema(
2619            window_expr,
2620            input,
2621            Arc::new(
2622                DFSchema::new_with_metadata(window_fields, metadata)?
2623                    .with_functional_dependencies(window_func_dependencies)?,
2624            ),
2625        )
2626    }
2627
2628    /// Create a new window function using the provided schema to avoid the overhead of
2629    /// building the schema again when the schema is already known.
2630    ///
2631    /// This method should only be called when you are absolutely sure that the schema being
2632    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2633    pub fn try_new_with_schema(
2634        window_expr: Vec<Expr>,
2635        input: Arc<LogicalPlan>,
2636        schema: DFSchemaRef,
2637    ) -> Result<Self> {
2638        let input_fields_count = input.schema().fields().len();
2639        if schema.fields().len() != input_fields_count + window_expr.len() {
2640            return plan_err!(
2641                "Window schema has wrong number of fields. Expected {} got {}",
2642                input_fields_count + window_expr.len(),
2643                schema.fields().len()
2644            );
2645        }
2646
2647        Ok(Window {
2648            input,
2649            window_expr,
2650            schema,
2651        })
2652    }
2653}
2654
2655// Manual implementation needed because of `schema` field. Comparison excludes this field.
2656impl PartialOrd for Window {
2657    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2658        match self.input.partial_cmp(&other.input)? {
2659            Ordering::Equal => {} // continue
2660            not_equal => return Some(not_equal),
2661        }
2662
2663        match self.window_expr.partial_cmp(&other.window_expr)? {
2664            Ordering::Equal => {} // continue
2665            not_equal => return Some(not_equal),
2666        }
2667
2668        // Contract for PartialOrd and PartialEq consistency requires that
2669        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2670        if self == other {
2671            Some(Ordering::Equal)
2672        } else {
2673            None
2674        }
2675    }
2676}
2677
2678/// Produces rows from a table provider by reference or from the context
2679#[derive(Clone)]
2680pub struct TableScan {
2681    /// The name of the table
2682    pub table_name: TableReference,
2683    /// The source of the table
2684    pub source: Arc<dyn TableSource>,
2685    /// Optional column indices to use as a projection
2686    pub projection: Option<Vec<usize>>,
2687    /// The schema description of the output
2688    pub projected_schema: DFSchemaRef,
2689    /// Optional expressions to be used as filters by the table provider
2690    pub filters: Vec<Expr>,
2691    /// Optional number of rows to read
2692    pub fetch: Option<usize>,
2693}
2694
2695impl Debug for TableScan {
2696    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2697        f.debug_struct("TableScan")
2698            .field("table_name", &self.table_name)
2699            .field("source", &"...")
2700            .field("projection", &self.projection)
2701            .field("projected_schema", &self.projected_schema)
2702            .field("filters", &self.filters)
2703            .field("fetch", &self.fetch)
2704            .finish_non_exhaustive()
2705    }
2706}
2707
2708impl PartialEq for TableScan {
2709    fn eq(&self, other: &Self) -> bool {
2710        self.table_name == other.table_name
2711            && self.projection == other.projection
2712            && self.projected_schema == other.projected_schema
2713            && self.filters == other.filters
2714            && self.fetch == other.fetch
2715    }
2716}
2717
2718impl Eq for TableScan {}
2719
2720// Manual implementation needed because of `source` and `projected_schema` fields.
2721// Comparison excludes these field.
2722impl PartialOrd for TableScan {
2723    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2724        #[derive(PartialEq, PartialOrd)]
2725        struct ComparableTableScan<'a> {
2726            /// The name of the table
2727            pub table_name: &'a TableReference,
2728            /// Optional column indices to use as a projection
2729            pub projection: &'a Option<Vec<usize>>,
2730            /// Optional expressions to be used as filters by the table provider
2731            pub filters: &'a Vec<Expr>,
2732            /// Optional number of rows to read
2733            pub fetch: &'a Option<usize>,
2734        }
2735        let comparable_self = ComparableTableScan {
2736            table_name: &self.table_name,
2737            projection: &self.projection,
2738            filters: &self.filters,
2739            fetch: &self.fetch,
2740        };
2741        let comparable_other = ComparableTableScan {
2742            table_name: &other.table_name,
2743            projection: &other.projection,
2744            filters: &other.filters,
2745            fetch: &other.fetch,
2746        };
2747        comparable_self
2748            .partial_cmp(&comparable_other)
2749            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2750            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2751    }
2752}
2753
2754impl Hash for TableScan {
2755    fn hash<H: Hasher>(&self, state: &mut H) {
2756        self.table_name.hash(state);
2757        self.projection.hash(state);
2758        self.projected_schema.hash(state);
2759        self.filters.hash(state);
2760        self.fetch.hash(state);
2761    }
2762}
2763
2764impl TableScan {
2765    /// Initialize TableScan with appropriate schema from the given
2766    /// arguments.
2767    pub fn try_new(
2768        table_name: impl Into<TableReference>,
2769        table_source: Arc<dyn TableSource>,
2770        projection: Option<Vec<usize>>,
2771        filters: Vec<Expr>,
2772        fetch: Option<usize>,
2773    ) -> Result<Self> {
2774        let table_name = table_name.into();
2775
2776        if table_name.table().is_empty() {
2777            return plan_err!("table_name cannot be empty");
2778        }
2779        let schema = table_source.schema();
2780        let func_dependencies = FunctionalDependencies::new_from_constraints(
2781            table_source.constraints(),
2782            schema.fields.len(),
2783        );
2784        let projected_schema = projection
2785            .as_ref()
2786            .map(|p| {
2787                let projected_func_dependencies =
2788                    func_dependencies.project_functional_dependencies(p, p.len());
2789
2790                let df_schema = DFSchema::new_with_metadata(
2791                    p.iter()
2792                        .map(|i| {
2793                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2794                        })
2795                        .collect(),
2796                    schema.metadata.clone(),
2797                )?;
2798                df_schema.with_functional_dependencies(projected_func_dependencies)
2799            })
2800            .unwrap_or_else(|| {
2801                let df_schema =
2802                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2803                df_schema.with_functional_dependencies(func_dependencies)
2804            })?;
2805        let projected_schema = Arc::new(projected_schema);
2806
2807        Ok(Self {
2808            table_name,
2809            source: table_source,
2810            projection,
2811            projected_schema,
2812            filters,
2813            fetch,
2814        })
2815    }
2816}
2817
2818// Repartition the plan based on a partitioning scheme.
2819#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2820pub struct Repartition {
2821    /// The incoming logical plan
2822    pub input: Arc<LogicalPlan>,
2823    /// The partitioning scheme
2824    pub partitioning_scheme: Partitioning,
2825}
2826
2827/// Union multiple inputs
2828#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2829pub struct Union {
2830    /// Inputs to merge
2831    pub inputs: Vec<Arc<LogicalPlan>>,
2832    /// Union schema. Should be the same for all inputs.
2833    pub schema: DFSchemaRef,
2834}
2835
2836impl Union {
2837    /// Constructs new Union instance deriving schema from inputs.
2838    /// Schema data types must match exactly.
2839    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2840        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2841        Ok(Union { inputs, schema })
2842    }
2843
2844    /// Constructs new Union instance deriving schema from inputs.
2845    /// Inputs do not have to have matching types and produced schema will
2846    /// take type from the first input.
2847    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2848    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2849        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2850        Ok(Union { inputs, schema })
2851    }
2852
2853    /// Constructs a new Union instance that combines rows from different tables by name,
2854    /// instead of by position. This means that the specified inputs need not have schemas
2855    /// that are all the same width.
2856    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2857        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2858        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2859
2860        Ok(Union { inputs, schema })
2861    }
2862
2863    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2864    /// in an additional `Projection` to account for absence of columns
2865    /// in input schemas or differing projection orders.
2866    fn rewrite_inputs_from_schema(
2867        schema: &Arc<DFSchema>,
2868        inputs: Vec<Arc<LogicalPlan>>,
2869    ) -> Result<Vec<Arc<LogicalPlan>>> {
2870        let schema_width = schema.iter().count();
2871        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2872        for input in inputs {
2873            // Any columns that exist within the derived schema but do not exist
2874            // within an input's schema should be replaced with `NULL` aliased
2875            // to the appropriate column in the derived schema.
2876            let mut expr = Vec::with_capacity(schema_width);
2877            for column in schema.columns() {
2878                if input
2879                    .schema()
2880                    .has_column_with_unqualified_name(column.name())
2881                {
2882                    expr.push(Expr::Column(column));
2883                } else {
2884                    expr.push(
2885                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2886                    );
2887                }
2888            }
2889            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2890                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2891            )));
2892        }
2893
2894        Ok(wrapped_inputs)
2895    }
2896
2897    /// Constructs new Union instance deriving schema from inputs.
2898    ///
2899    /// If `loose_types` is true, inputs do not need to have matching types and
2900    /// the produced schema will use the type from the first input.
2901    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2902    ///
2903    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2904    /// the constructed schema follows `UNION BY NAME` semantics.
2905    fn derive_schema_from_inputs(
2906        inputs: &[Arc<LogicalPlan>],
2907        loose_types: bool,
2908        by_name: bool,
2909    ) -> Result<DFSchemaRef> {
2910        if inputs.len() < 2 {
2911            return plan_err!("UNION requires at least two inputs");
2912        }
2913
2914        if by_name {
2915            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2916        } else {
2917            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2918        }
2919    }
2920
2921    fn derive_schema_from_inputs_by_name(
2922        inputs: &[Arc<LogicalPlan>],
2923        loose_types: bool,
2924    ) -> Result<DFSchemaRef> {
2925        type FieldData<'a> =
2926            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2927        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2928        for input in inputs.iter() {
2929            for field in input.schema().fields() {
2930                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2931                    cols.iter_mut().find(|(name, _)| name == field.name())
2932                {
2933                    if !loose_types && *data_type != field.data_type() {
2934                        return plan_err!(
2935                            "Found different types for field {}",
2936                            field.name()
2937                        );
2938                    }
2939
2940                    metadata.push(field.metadata());
2941                    // If the field is nullable in any one of the inputs,
2942                    // then the field in the final schema is also nullable.
2943                    *is_nullable |= field.is_nullable();
2944                    *occurrences += 1;
2945                } else {
2946                    cols.push((
2947                        field.name(),
2948                        (
2949                            field.data_type(),
2950                            field.is_nullable(),
2951                            vec![field.metadata()],
2952                            1,
2953                        ),
2954                    ));
2955                }
2956            }
2957        }
2958
2959        let union_fields = cols
2960            .into_iter()
2961            .map(
2962                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2963                    // If the final number of occurrences of the field is less
2964                    // than the number of inputs (i.e. the field is missing from
2965                    // one or more inputs), then it must be treated as nullable.
2966                    let final_is_nullable = if occurrences == inputs.len() {
2967                        is_nullable
2968                    } else {
2969                        true
2970                    };
2971
2972                    let mut field =
2973                        Field::new(name, data_type.clone(), final_is_nullable);
2974                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2975
2976                    (None, Arc::new(field))
2977                },
2978            )
2979            .collect::<Vec<(Option<TableReference>, _)>>();
2980
2981        let union_schema_metadata = intersect_metadata_for_union(
2982            inputs.iter().map(|input| input.schema().metadata()),
2983        );
2984
2985        // Functional Dependencies are not preserved after UNION operation
2986        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2987        let schema = Arc::new(schema);
2988
2989        Ok(schema)
2990    }
2991
2992    fn derive_schema_from_inputs_by_position(
2993        inputs: &[Arc<LogicalPlan>],
2994        loose_types: bool,
2995    ) -> Result<DFSchemaRef> {
2996        let first_schema = inputs[0].schema();
2997        let fields_count = first_schema.fields().len();
2998        for input in inputs.iter().skip(1) {
2999            if fields_count != input.schema().fields().len() {
3000                return plan_err!(
3001                    "UNION queries have different number of columns: \
3002                    left has {} columns whereas right has {} columns",
3003                    fields_count,
3004                    input.schema().fields().len()
3005                );
3006            }
3007        }
3008
3009        let mut name_counts: HashMap<String, usize> = HashMap::new();
3010        let union_fields = (0..fields_count)
3011            .map(|i| {
3012                let fields = inputs
3013                    .iter()
3014                    .map(|input| input.schema().field(i))
3015                    .collect::<Vec<_>>();
3016                let first_field = fields[0];
3017                let base_name = first_field.name().to_string();
3018
3019                let data_type = if loose_types {
3020                    // TODO apply type coercion here, or document why it's better to defer
3021                    // temporarily use the data type from the left input and later rely on the analyzer to
3022                    // coerce the two schemas into a common one.
3023                    first_field.data_type()
3024                } else {
3025                    fields.iter().skip(1).try_fold(
3026                        first_field.data_type(),
3027                        |acc, field| {
3028                            if acc != field.data_type() {
3029                                return plan_err!(
3030                                    "UNION field {i} have different type in inputs: \
3031                                    left has {} whereas right has {}",
3032                                    first_field.data_type(),
3033                                    field.data_type()
3034                                );
3035                            }
3036                            Ok(acc)
3037                        },
3038                    )?
3039                };
3040                let nullable = fields.iter().any(|field| field.is_nullable());
3041
3042                // Generate unique field name
3043                let name = if let Some(count) = name_counts.get_mut(&base_name) {
3044                    *count += 1;
3045                    format!("{base_name}_{count}")
3046                } else {
3047                    name_counts.insert(base_name.clone(), 0);
3048                    base_name
3049                };
3050
3051                let mut field = Field::new(&name, data_type.clone(), nullable);
3052                let field_metadata = intersect_metadata_for_union(
3053                    fields.iter().map(|field| field.metadata()),
3054                );
3055                field.set_metadata(field_metadata);
3056                Ok((None, Arc::new(field)))
3057            })
3058            .collect::<Result<_>>()?;
3059        let union_schema_metadata = intersect_metadata_for_union(
3060            inputs.iter().map(|input| input.schema().metadata()),
3061        );
3062
3063        // Functional Dependencies are not preserved after UNION operation
3064        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3065        let schema = Arc::new(schema);
3066
3067        Ok(schema)
3068    }
3069}
3070
3071// Manual implementation needed because of `schema` field. Comparison excludes this field.
3072impl PartialOrd for Union {
3073    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3074        self.inputs
3075            .partial_cmp(&other.inputs)
3076            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3077            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3078    }
3079}
3080
3081/// Describe the schema of table
3082///
3083/// # Example output:
3084///
3085/// ```sql
3086/// > describe traces;
3087/// +--------------------+-----------------------------+-------------+
3088/// | column_name        | data_type                   | is_nullable |
3089/// +--------------------+-----------------------------+-------------+
3090/// | attributes         | Utf8                        | YES         |
3091/// | duration_nano      | Int64                       | YES         |
3092/// | end_time_unix_nano | Int64                       | YES         |
3093/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3094/// | span.kind          | Utf8                        | YES         |
3095/// | span.name          | Utf8                        | YES         |
3096/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3097/// | time               | Timestamp(Nanosecond, None) | NO          |
3098/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3099/// | otel.status_code   | Utf8                        | YES         |
3100/// | parent_span_id     | Utf8                        | YES         |
3101/// +--------------------+-----------------------------+-------------+
3102/// ```
3103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3104pub struct DescribeTable {
3105    /// Table schema
3106    pub schema: Arc<Schema>,
3107    /// schema of describe table output
3108    pub output_schema: DFSchemaRef,
3109}
3110
3111// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3112// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3113impl PartialOrd for DescribeTable {
3114    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3115        // There is no relevant comparison for schemas
3116        None
3117    }
3118}
3119
3120/// Options for EXPLAIN
3121#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3122pub struct ExplainOption {
3123    /// Include detailed debug info
3124    pub verbose: bool,
3125    /// Actually execute the plan and report metrics
3126    pub analyze: bool,
3127    /// Output syntax/format
3128    pub format: ExplainFormat,
3129}
3130
3131impl Default for ExplainOption {
3132    fn default() -> Self {
3133        ExplainOption {
3134            verbose: false,
3135            analyze: false,
3136            format: ExplainFormat::Indent,
3137        }
3138    }
3139}
3140
3141impl ExplainOption {
3142    /// Builder‐style setter for `verbose`
3143    pub fn with_verbose(mut self, verbose: bool) -> Self {
3144        self.verbose = verbose;
3145        self
3146    }
3147
3148    /// Builder‐style setter for `analyze`
3149    pub fn with_analyze(mut self, analyze: bool) -> Self {
3150        self.analyze = analyze;
3151        self
3152    }
3153
3154    /// Builder‐style setter for `format`
3155    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3156        self.format = format;
3157        self
3158    }
3159}
3160
3161/// Produces a relation with string representations of
3162/// various parts of the plan
3163///
3164/// See [the documentation] for more information
3165///
3166/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3167#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3168pub struct Explain {
3169    /// Should extra (detailed, intermediate plans) be included?
3170    pub verbose: bool,
3171    /// Output format for explain, if specified.
3172    /// If none, defaults to `text`
3173    pub explain_format: ExplainFormat,
3174    /// The logical plan that is being EXPLAIN'd
3175    pub plan: Arc<LogicalPlan>,
3176    /// Represent the various stages plans have gone through
3177    pub stringified_plans: Vec<StringifiedPlan>,
3178    /// The output schema of the explain (2 columns of text)
3179    pub schema: DFSchemaRef,
3180    /// Used by physical planner to check if should proceed with planning
3181    pub logical_optimization_succeeded: bool,
3182}
3183
3184// Manual implementation needed because of `schema` field. Comparison excludes this field.
3185impl PartialOrd for Explain {
3186    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3187        #[derive(PartialEq, PartialOrd)]
3188        struct ComparableExplain<'a> {
3189            /// Should extra (detailed, intermediate plans) be included?
3190            pub verbose: &'a bool,
3191            /// The logical plan that is being EXPLAIN'd
3192            pub plan: &'a Arc<LogicalPlan>,
3193            /// Represent the various stages plans have gone through
3194            pub stringified_plans: &'a Vec<StringifiedPlan>,
3195            /// Used by physical planner to check if should proceed with planning
3196            pub logical_optimization_succeeded: &'a bool,
3197        }
3198        let comparable_self = ComparableExplain {
3199            verbose: &self.verbose,
3200            plan: &self.plan,
3201            stringified_plans: &self.stringified_plans,
3202            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3203        };
3204        let comparable_other = ComparableExplain {
3205            verbose: &other.verbose,
3206            plan: &other.plan,
3207            stringified_plans: &other.stringified_plans,
3208            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3209        };
3210        comparable_self
3211            .partial_cmp(&comparable_other)
3212            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3213            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3214    }
3215}
3216
3217/// Runs the actual plan, and then prints the physical plan with
3218/// with execution metrics.
3219#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3220pub struct Analyze {
3221    /// Should extra detail be included?
3222    pub verbose: bool,
3223    /// The logical plan that is being EXPLAIN ANALYZE'd
3224    pub input: Arc<LogicalPlan>,
3225    /// The output schema of the explain (2 columns of text)
3226    pub schema: DFSchemaRef,
3227}
3228
3229// Manual implementation needed because of `schema` field. Comparison excludes this field.
3230impl PartialOrd for Analyze {
3231    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3232        match self.verbose.partial_cmp(&other.verbose) {
3233            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3234            cmp => cmp,
3235        }
3236        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3237        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3238    }
3239}
3240
3241/// Extension operator defined outside of DataFusion
3242// TODO(clippy): This clippy `allow` should be removed if
3243// the manual `PartialEq` is removed in favor of a derive.
3244// (see `PartialEq` the impl for details.)
3245#[allow(clippy::allow_attributes)]
3246#[allow(clippy::derived_hash_with_manual_eq)]
3247#[derive(Debug, Clone, Eq, Hash)]
3248pub struct Extension {
3249    /// The runtime extension operator
3250    pub node: Arc<dyn UserDefinedLogicalNode>,
3251}
3252
3253// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3254// This manual implementation should be removed if
3255// https://github.com/rust-lang/rust/issues/39128 is fixed.
3256impl PartialEq for Extension {
3257    fn eq(&self, other: &Self) -> bool {
3258        self.node.eq(&other.node)
3259    }
3260}
3261
3262impl PartialOrd for Extension {
3263    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3264        self.node.partial_cmp(&other.node)
3265    }
3266}
3267
3268/// Produces the first `n` tuples from its input and discards the rest.
3269#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3270pub struct Limit {
3271    /// Number of rows to skip before fetch
3272    pub skip: Option<Box<Expr>>,
3273    /// Maximum number of rows to fetch,
3274    /// None means fetching all rows
3275    pub fetch: Option<Box<Expr>>,
3276    /// The logical plan
3277    pub input: Arc<LogicalPlan>,
3278}
3279
3280/// Different types of skip expression in Limit plan.
3281pub enum SkipType {
3282    /// The skip expression is a literal value.
3283    Literal(usize),
3284    /// Currently only supports expressions that can be folded into constants.
3285    UnsupportedExpr,
3286}
3287
3288/// Different types of fetch expression in Limit plan.
3289pub enum FetchType {
3290    /// The fetch expression is a literal value.
3291    /// `Literal(None)` means the fetch expression is not provided.
3292    Literal(Option<usize>),
3293    /// Currently only supports expressions that can be folded into constants.
3294    UnsupportedExpr,
3295}
3296
3297impl Limit {
3298    /// Get the skip type from the limit plan.
3299    pub fn get_skip_type(&self) -> Result<SkipType> {
3300        match self.skip.as_deref() {
3301            Some(expr) => match *expr {
3302                Expr::Literal(ScalarValue::Int64(s), _) => {
3303                    // `skip = NULL` is equivalent to `skip = 0`
3304                    let s = s.unwrap_or(0);
3305                    if s >= 0 {
3306                        Ok(SkipType::Literal(s as usize))
3307                    } else {
3308                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3309                    }
3310                }
3311                _ => Ok(SkipType::UnsupportedExpr),
3312            },
3313            // `skip = None` is equivalent to `skip = 0`
3314            None => Ok(SkipType::Literal(0)),
3315        }
3316    }
3317
3318    /// Get the fetch type from the limit plan.
3319    pub fn get_fetch_type(&self) -> Result<FetchType> {
3320        match self.fetch.as_deref() {
3321            Some(expr) => match *expr {
3322                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3323                    if s >= 0 {
3324                        Ok(FetchType::Literal(Some(s as usize)))
3325                    } else {
3326                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3327                    }
3328                }
3329                Expr::Literal(ScalarValue::Int64(None), _) => {
3330                    Ok(FetchType::Literal(None))
3331                }
3332                _ => Ok(FetchType::UnsupportedExpr),
3333            },
3334            None => Ok(FetchType::Literal(None)),
3335        }
3336    }
3337}
3338
3339/// Removes duplicate rows from the input
3340#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3341pub enum Distinct {
3342    /// Plain `DISTINCT` referencing all selection expressions
3343    All(Arc<LogicalPlan>),
3344    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3345    On(DistinctOn),
3346}
3347
3348impl Distinct {
3349    /// return a reference to the nodes input
3350    pub fn input(&self) -> &Arc<LogicalPlan> {
3351        match self {
3352            Distinct::All(input) => input,
3353            Distinct::On(DistinctOn { input, .. }) => input,
3354        }
3355    }
3356}
3357
3358/// Removes duplicate rows from the input
3359#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3360pub struct DistinctOn {
3361    /// The `DISTINCT ON` clause expression list
3362    pub on_expr: Vec<Expr>,
3363    /// The selected projection expression list
3364    pub select_expr: Vec<Expr>,
3365    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3366    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3367    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3368    pub sort_expr: Option<Vec<SortExpr>>,
3369    /// The logical plan that is being DISTINCT'd
3370    pub input: Arc<LogicalPlan>,
3371    /// The schema description of the DISTINCT ON output
3372    pub schema: DFSchemaRef,
3373}
3374
3375impl DistinctOn {
3376    /// Create a new `DistinctOn` struct.
3377    pub fn try_new(
3378        on_expr: Vec<Expr>,
3379        select_expr: Vec<Expr>,
3380        sort_expr: Option<Vec<SortExpr>>,
3381        input: Arc<LogicalPlan>,
3382    ) -> Result<Self> {
3383        if on_expr.is_empty() {
3384            return plan_err!("No `ON` expressions provided");
3385        }
3386
3387        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3388        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3389            .into_iter()
3390            .collect();
3391
3392        let dfschema = DFSchema::new_with_metadata(
3393            qualified_fields,
3394            input.schema().metadata().clone(),
3395        )?;
3396
3397        let mut distinct_on = DistinctOn {
3398            on_expr,
3399            select_expr,
3400            sort_expr: None,
3401            input,
3402            schema: Arc::new(dfschema),
3403        };
3404
3405        if let Some(sort_expr) = sort_expr {
3406            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3407        }
3408
3409        Ok(distinct_on)
3410    }
3411
3412    /// Try to update `self` with a new sort expressions.
3413    ///
3414    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3415    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3416        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3417
3418        // Check that the left-most sort expressions are the same as the `ON` expressions.
3419        let mut matched = true;
3420        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3421            if on != &sort.expr {
3422                matched = false;
3423                break;
3424            }
3425        }
3426
3427        if self.on_expr.len() > sort_expr.len() || !matched {
3428            return plan_err!(
3429                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3430            );
3431        }
3432
3433        self.sort_expr = Some(sort_expr);
3434        Ok(self)
3435    }
3436}
3437
3438// Manual implementation needed because of `schema` field. Comparison excludes this field.
3439impl PartialOrd for DistinctOn {
3440    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3441        #[derive(PartialEq, PartialOrd)]
3442        struct ComparableDistinctOn<'a> {
3443            /// The `DISTINCT ON` clause expression list
3444            pub on_expr: &'a Vec<Expr>,
3445            /// The selected projection expression list
3446            pub select_expr: &'a Vec<Expr>,
3447            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3448            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3449            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3450            pub sort_expr: &'a Option<Vec<SortExpr>>,
3451            /// The logical plan that is being DISTINCT'd
3452            pub input: &'a Arc<LogicalPlan>,
3453        }
3454        let comparable_self = ComparableDistinctOn {
3455            on_expr: &self.on_expr,
3456            select_expr: &self.select_expr,
3457            sort_expr: &self.sort_expr,
3458            input: &self.input,
3459        };
3460        let comparable_other = ComparableDistinctOn {
3461            on_expr: &other.on_expr,
3462            select_expr: &other.select_expr,
3463            sort_expr: &other.sort_expr,
3464            input: &other.input,
3465        };
3466        comparable_self
3467            .partial_cmp(&comparable_other)
3468            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3469            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3470    }
3471}
3472
3473/// Aggregates its input based on a set of grouping and aggregate
3474/// expressions (e.g. SUM).
3475///
3476/// # Output Schema
3477///
3478/// The output schema is the group expressions followed by the aggregate
3479/// expressions in order.
3480///
3481/// For example, given the input schema `"A", "B", "C"` and the aggregate
3482/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3483/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3484/// single new column
3485#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3486// mark non_exhaustive to encourage use of try_new/new()
3487#[non_exhaustive]
3488pub struct Aggregate {
3489    /// The incoming logical plan
3490    pub input: Arc<LogicalPlan>,
3491    /// Grouping expressions
3492    pub group_expr: Vec<Expr>,
3493    /// Aggregate expressions
3494    pub aggr_expr: Vec<Expr>,
3495    /// The schema description of the aggregate output
3496    pub schema: DFSchemaRef,
3497}
3498
3499impl Aggregate {
3500    /// Create a new aggregate operator.
3501    pub fn try_new(
3502        input: Arc<LogicalPlan>,
3503        group_expr: Vec<Expr>,
3504        aggr_expr: Vec<Expr>,
3505    ) -> Result<Self> {
3506        let group_expr = enumerate_grouping_sets(group_expr)?;
3507
3508        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3509
3510        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3511
3512        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3513
3514        // Even columns that cannot be null will become nullable when used in a grouping set.
3515        if is_grouping_set {
3516            qualified_fields = qualified_fields
3517                .into_iter()
3518                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3519                .collect::<Vec<_>>();
3520            qualified_fields.push((
3521                None,
3522                Field::new(
3523                    Self::INTERNAL_GROUPING_ID,
3524                    Self::grouping_id_type(qualified_fields.len()),
3525                    false,
3526                )
3527                .into(),
3528            ));
3529        }
3530
3531        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3532
3533        let schema = DFSchema::new_with_metadata(
3534            qualified_fields,
3535            input.schema().metadata().clone(),
3536        )?;
3537
3538        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3539    }
3540
3541    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3542    /// building the schema again when the schema is already known.
3543    ///
3544    /// This method should only be called when you are absolutely sure that the schema being
3545    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3546    #[expect(clippy::needless_pass_by_value)]
3547    pub fn try_new_with_schema(
3548        input: Arc<LogicalPlan>,
3549        group_expr: Vec<Expr>,
3550        aggr_expr: Vec<Expr>,
3551        schema: DFSchemaRef,
3552    ) -> Result<Self> {
3553        if group_expr.is_empty() && aggr_expr.is_empty() {
3554            return plan_err!(
3555                "Aggregate requires at least one grouping or aggregate expression. \
3556                Aggregate without grouping expressions nor aggregate expressions is \
3557                logically equivalent to, but less efficient than, VALUES producing \
3558                single row. Please use VALUES instead."
3559            );
3560        }
3561        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3562        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3563            return plan_err!(
3564                "Aggregate schema has wrong number of fields. Expected {} got {}",
3565                group_expr_count + aggr_expr.len(),
3566                schema.fields().len()
3567            );
3568        }
3569
3570        let aggregate_func_dependencies =
3571            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3572        let new_schema = schema.as_ref().clone();
3573        let schema = Arc::new(
3574            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3575        );
3576        Ok(Self {
3577            input,
3578            group_expr,
3579            aggr_expr,
3580            schema,
3581        })
3582    }
3583
3584    fn is_grouping_set(&self) -> bool {
3585        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3586    }
3587
3588    /// Get the output expressions.
3589    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3590        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3591            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3592        });
3593        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3594        if self.is_grouping_set() {
3595            exprs.push(&INTERNAL_ID_EXPR);
3596        }
3597        exprs.extend(self.aggr_expr.iter());
3598        debug_assert!(exprs.len() == self.schema.fields().len());
3599        Ok(exprs)
3600    }
3601
3602    /// Get the length of the group by expression in the output schema
3603    /// This is not simply group by expression length. Expression may be
3604    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3605    pub fn group_expr_len(&self) -> Result<usize> {
3606        grouping_set_expr_count(&self.group_expr)
3607    }
3608
3609    /// Returns the data type of the grouping id.
3610    /// The grouping ID value is a bitmask where each set bit
3611    /// indicates that the corresponding grouping expression is
3612    /// null
3613    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3614        if group_exprs <= 8 {
3615            DataType::UInt8
3616        } else if group_exprs <= 16 {
3617            DataType::UInt16
3618        } else if group_exprs <= 32 {
3619            DataType::UInt32
3620        } else {
3621            DataType::UInt64
3622        }
3623    }
3624
3625    /// Internal column used when the aggregation is a grouping set.
3626    ///
3627    /// This column contains a bitmask where each bit represents a grouping
3628    /// expression. The least significant bit corresponds to the rightmost
3629    /// grouping expression. A bit value of 0 indicates that the corresponding
3630    /// column is included in the grouping set, while a value of 1 means it is excluded.
3631    ///
3632    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3633    /// column will have the following values:
3634    ///     0b00: Both `a` and `b` are included
3635    ///     0b01: `b` is excluded
3636    ///     0b10: `a` is excluded
3637    ///     0b11: Both `a` and `b` are excluded
3638    ///
3639    /// This internal column is necessary because excluded columns are replaced
3640    /// with `NULL` values. To handle these cases correctly, we must distinguish
3641    /// between an actual `NULL` value in a column and a column being excluded from the set.
3642    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3643}
3644
3645// Manual implementation needed because of `schema` field. Comparison excludes this field.
3646impl PartialOrd for Aggregate {
3647    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3648        match self.input.partial_cmp(&other.input) {
3649            Some(Ordering::Equal) => {
3650                match self.group_expr.partial_cmp(&other.group_expr) {
3651                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3652                    cmp => cmp,
3653                }
3654            }
3655            cmp => cmp,
3656        }
3657        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3658        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3659    }
3660}
3661
3662/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3663fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3664    group_expr
3665        .iter()
3666        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3667}
3668
3669/// Calculates functional dependencies for aggregate expressions.
3670fn calc_func_dependencies_for_aggregate(
3671    // Expressions in the GROUP BY clause:
3672    group_expr: &[Expr],
3673    // Input plan of the aggregate:
3674    input: &LogicalPlan,
3675    // Aggregate schema
3676    aggr_schema: &DFSchema,
3677) -> Result<FunctionalDependencies> {
3678    // We can do a case analysis on how to propagate functional dependencies based on
3679    // whether the GROUP BY in question contains a grouping set expression:
3680    // - If so, the functional dependencies will be empty because we cannot guarantee
3681    //   that GROUP BY expression results will be unique.
3682    // - Otherwise, it may be possible to propagate functional dependencies.
3683    if !contains_grouping_set(group_expr) {
3684        let group_by_expr_names = group_expr
3685            .iter()
3686            .map(|item| item.schema_name().to_string())
3687            .collect::<IndexSet<_>>()
3688            .into_iter()
3689            .collect::<Vec<_>>();
3690        let aggregate_func_dependencies = aggregate_functional_dependencies(
3691            input.schema(),
3692            &group_by_expr_names,
3693            aggr_schema,
3694        );
3695        Ok(aggregate_func_dependencies)
3696    } else {
3697        Ok(FunctionalDependencies::empty())
3698    }
3699}
3700
3701/// This function projects functional dependencies of the `input` plan according
3702/// to projection expressions `exprs`.
3703fn calc_func_dependencies_for_project(
3704    exprs: &[Expr],
3705    input: &LogicalPlan,
3706) -> Result<FunctionalDependencies> {
3707    let input_fields = input.schema().field_names();
3708    // Calculate expression indices (if present) in the input schema.
3709    let proj_indices = exprs
3710        .iter()
3711        .map(|expr| match expr {
3712            #[expect(deprecated)]
3713            Expr::Wildcard { qualifier, options } => {
3714                let wildcard_fields = exprlist_to_fields(
3715                    vec![&Expr::Wildcard {
3716                        qualifier: qualifier.clone(),
3717                        options: options.clone(),
3718                    }],
3719                    input,
3720                )?;
3721                Ok::<_, DataFusionError>(
3722                    wildcard_fields
3723                        .into_iter()
3724                        .filter_map(|(qualifier, f)| {
3725                            let flat_name = qualifier
3726                                .map(|t| format!("{}.{}", t, f.name()))
3727                                .unwrap_or_else(|| f.name().clone());
3728                            input_fields.iter().position(|item| *item == flat_name)
3729                        })
3730                        .collect::<Vec<_>>(),
3731                )
3732            }
3733            Expr::Alias(alias) => {
3734                let name = format!("{}", alias.expr);
3735                Ok(input_fields
3736                    .iter()
3737                    .position(|item| *item == name)
3738                    .map(|i| vec![i])
3739                    .unwrap_or(vec![]))
3740            }
3741            _ => {
3742                let name = format!("{expr}");
3743                Ok(input_fields
3744                    .iter()
3745                    .position(|item| *item == name)
3746                    .map(|i| vec![i])
3747                    .unwrap_or(vec![]))
3748            }
3749        })
3750        .collect::<Result<Vec<_>>>()?
3751        .into_iter()
3752        .flatten()
3753        .collect::<Vec<_>>();
3754
3755    Ok(input
3756        .schema()
3757        .functional_dependencies()
3758        .project_functional_dependencies(&proj_indices, exprs.len()))
3759}
3760
3761/// Sorts its input according to a list of sort expressions.
3762#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3763pub struct Sort {
3764    /// The sort expressions
3765    pub expr: Vec<SortExpr>,
3766    /// The incoming logical plan
3767    pub input: Arc<LogicalPlan>,
3768    /// Optional fetch limit
3769    pub fetch: Option<usize>,
3770}
3771
3772/// Join two logical plans on one or more join columns
3773#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3774pub struct Join {
3775    /// Left input
3776    pub left: Arc<LogicalPlan>,
3777    /// Right input
3778    pub right: Arc<LogicalPlan>,
3779    /// Equijoin clause expressed as pairs of (left, right) join expressions
3780    pub on: Vec<(Expr, Expr)>,
3781    /// Filters applied during join (non-equi conditions)
3782    pub filter: Option<Expr>,
3783    /// Join type
3784    pub join_type: JoinType,
3785    /// Join constraint
3786    pub join_constraint: JoinConstraint,
3787    /// The output schema, containing fields from the left and right inputs
3788    pub schema: DFSchemaRef,
3789    /// Defines the null equality for the join.
3790    pub null_equality: NullEquality,
3791    /// Whether this is a null-aware anti join (for NOT IN semantics).
3792    ///
3793    /// Only applies to LeftAnti joins. When true, implements SQL NOT IN semantics where:
3794    /// - If the right side (subquery) contains any NULL in join keys, no rows are output
3795    /// - Left side rows with NULL in join keys are not output
3796    ///
3797    /// This is required for correct NOT IN subquery behavior with three-valued logic.
3798    pub null_aware: bool,
3799}
3800
3801impl Join {
3802    /// Creates a new Join operator with automatically computed schema.
3803    ///
3804    /// This constructor computes the schema based on the join type and inputs,
3805    /// removing the need to manually specify the schema or call `recompute_schema`.
3806    ///
3807    /// # Arguments
3808    ///
3809    /// * `left` - Left input plan
3810    /// * `right` - Right input plan
3811    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3812    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3813    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3814    /// * `join_constraint` - Join constraint (On, Using)
3815    /// * `null_equality` - How to handle nulls in join comparisons
3816    /// * `null_aware` - Whether this is a null-aware anti join (for NOT IN semantics)
3817    ///
3818    /// # Returns
3819    ///
3820    /// A new Join operator with the computed schema
3821    #[expect(clippy::too_many_arguments)]
3822    pub fn try_new(
3823        left: Arc<LogicalPlan>,
3824        right: Arc<LogicalPlan>,
3825        on: Vec<(Expr, Expr)>,
3826        filter: Option<Expr>,
3827        join_type: JoinType,
3828        join_constraint: JoinConstraint,
3829        null_equality: NullEquality,
3830        null_aware: bool,
3831    ) -> Result<Self> {
3832        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3833
3834        Ok(Join {
3835            left,
3836            right,
3837            on,
3838            filter,
3839            join_type,
3840            join_constraint,
3841            schema: Arc::new(join_schema),
3842            null_equality,
3843            null_aware,
3844        })
3845    }
3846
3847    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3848    /// create the physical join.
3849    pub fn try_new_with_project_input(
3850        original: &LogicalPlan,
3851        left: Arc<LogicalPlan>,
3852        right: Arc<LogicalPlan>,
3853        column_on: (Vec<Column>, Vec<Column>),
3854    ) -> Result<(Self, bool)> {
3855        let original_join = match original {
3856            LogicalPlan::Join(join) => join,
3857            _ => return plan_err!("Could not create join with project input"),
3858        };
3859
3860        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3861        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3862
3863        let mut requalified = false;
3864
3865        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3866        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3867        if original_join.join_type == JoinType::Inner
3868            || original_join.join_type == JoinType::Left
3869            || original_join.join_type == JoinType::Right
3870            || original_join.join_type == JoinType::Full
3871        {
3872            (left_sch, right_sch, requalified) =
3873                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3874        }
3875
3876        let on: Vec<(Expr, Expr)> = column_on
3877            .0
3878            .into_iter()
3879            .zip(column_on.1)
3880            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3881            .collect();
3882
3883        let join_schema = build_join_schema(
3884            left_sch.schema(),
3885            right_sch.schema(),
3886            &original_join.join_type,
3887        )?;
3888
3889        Ok((
3890            Join {
3891                left,
3892                right,
3893                on,
3894                filter: original_join.filter.clone(),
3895                join_type: original_join.join_type,
3896                join_constraint: original_join.join_constraint,
3897                schema: Arc::new(join_schema),
3898                null_equality: original_join.null_equality,
3899                null_aware: original_join.null_aware,
3900            },
3901            requalified,
3902        ))
3903    }
3904}
3905
3906// Manual implementation needed because of `schema` field. Comparison excludes this field.
3907impl PartialOrd for Join {
3908    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3909        #[derive(PartialEq, PartialOrd)]
3910        struct ComparableJoin<'a> {
3911            /// Left input
3912            pub left: &'a Arc<LogicalPlan>,
3913            /// Right input
3914            pub right: &'a Arc<LogicalPlan>,
3915            /// Equijoin clause expressed as pairs of (left, right) join expressions
3916            pub on: &'a Vec<(Expr, Expr)>,
3917            /// Filters applied during join (non-equi conditions)
3918            pub filter: &'a Option<Expr>,
3919            /// Join type
3920            pub join_type: &'a JoinType,
3921            /// Join constraint
3922            pub join_constraint: &'a JoinConstraint,
3923            /// The null handling behavior for equalities
3924            pub null_equality: &'a NullEquality,
3925        }
3926        let comparable_self = ComparableJoin {
3927            left: &self.left,
3928            right: &self.right,
3929            on: &self.on,
3930            filter: &self.filter,
3931            join_type: &self.join_type,
3932            join_constraint: &self.join_constraint,
3933            null_equality: &self.null_equality,
3934        };
3935        let comparable_other = ComparableJoin {
3936            left: &other.left,
3937            right: &other.right,
3938            on: &other.on,
3939            filter: &other.filter,
3940            join_type: &other.join_type,
3941            join_constraint: &other.join_constraint,
3942            null_equality: &other.null_equality,
3943        };
3944        comparable_self
3945            .partial_cmp(&comparable_other)
3946            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3947            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3948    }
3949}
3950
3951/// Subquery
3952#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3953pub struct Subquery {
3954    /// The subquery
3955    pub subquery: Arc<LogicalPlan>,
3956    /// The outer references used in the subquery
3957    pub outer_ref_columns: Vec<Expr>,
3958    /// Span information for subquery projection columns
3959    pub spans: Spans,
3960}
3961
3962impl Normalizeable for Subquery {
3963    fn can_normalize(&self) -> bool {
3964        false
3965    }
3966}
3967
3968impl NormalizeEq for Subquery {
3969    fn normalize_eq(&self, other: &Self) -> bool {
3970        // TODO: may be implement NormalizeEq for LogicalPlan?
3971        *self.subquery == *other.subquery
3972            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3973            && self
3974                .outer_ref_columns
3975                .iter()
3976                .zip(other.outer_ref_columns.iter())
3977                .all(|(a, b)| a.normalize_eq(b))
3978    }
3979}
3980
3981impl Subquery {
3982    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3983        match plan {
3984            Expr::ScalarSubquery(it) => Ok(it),
3985            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3986            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3987        }
3988    }
3989
3990    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3991        Subquery {
3992            subquery: plan,
3993            outer_ref_columns: self.outer_ref_columns.clone(),
3994            spans: Spans::new(),
3995        }
3996    }
3997}
3998
3999impl Debug for Subquery {
4000    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4001        write!(f, "<subquery>")
4002    }
4003}
4004
4005/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
4006///
4007/// See [`Partitioning`] for more details on partitioning
4008///
4009/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
4010#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4011pub enum Partitioning {
4012    /// Allocate batches using a round-robin algorithm and the specified number of partitions
4013    RoundRobinBatch(usize),
4014    /// Allocate rows based on a hash of one of more expressions and the specified number
4015    /// of partitions.
4016    Hash(Vec<Expr>, usize),
4017    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
4018    DistributeBy(Vec<Expr>),
4019}
4020
4021/// Represent the unnesting operation on a list column, such as the recursion depth and
4022/// the output column name after unnesting
4023///
4024/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4025///
4026/// ```text
4027///   input             output_name
4028///  ┌─────────┐      ┌─────────┐
4029///  │{{1,2}}  │      │ 1       │
4030///  ├─────────┼─────►├─────────┤
4031///  │{{3}}    │      │ 2       │
4032///  ├─────────┤      ├─────────┤
4033///  │{{4},{5}}│      │ 3       │
4034///  └─────────┘      ├─────────┤
4035///                   │ 4       │
4036///                   ├─────────┤
4037///                   │ 5       │
4038///                   └─────────┘
4039/// ```
4040#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4041pub struct ColumnUnnestList {
4042    pub output_column: Column,
4043    pub depth: usize,
4044}
4045
4046impl Display for ColumnUnnestList {
4047    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4048        write!(f, "{}|depth={}", self.output_column, self.depth)
4049    }
4050}
4051
4052/// Unnest a column that contains a nested list type. See
4053/// [`UnnestOptions`] for more details.
4054#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4055pub struct Unnest {
4056    /// The incoming logical plan
4057    pub input: Arc<LogicalPlan>,
4058    /// Columns to run unnest on, can be a list of (List/Struct) columns
4059    pub exec_columns: Vec<Column>,
4060    /// refer to the indices(in the input schema) of columns
4061    /// that have type list to run unnest on
4062    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4063    /// refer to the indices (in the input schema) of columns
4064    /// that have type struct to run unnest on
4065    pub struct_type_columns: Vec<usize>,
4066    /// Having items aligned with the output columns
4067    /// representing which column in the input schema each output column depends on
4068    pub dependency_indices: Vec<usize>,
4069    /// The output schema, containing the unnested field column.
4070    pub schema: DFSchemaRef,
4071    /// Options
4072    pub options: UnnestOptions,
4073}
4074
4075// Manual implementation needed because of `schema` field. Comparison excludes this field.
4076impl PartialOrd for Unnest {
4077    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4078        #[derive(PartialEq, PartialOrd)]
4079        struct ComparableUnnest<'a> {
4080            /// The incoming logical plan
4081            pub input: &'a Arc<LogicalPlan>,
4082            /// Columns to run unnest on, can be a list of (List/Struct) columns
4083            pub exec_columns: &'a Vec<Column>,
4084            /// refer to the indices(in the input schema) of columns
4085            /// that have type list to run unnest on
4086            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4087            /// refer to the indices (in the input schema) of columns
4088            /// that have type struct to run unnest on
4089            pub struct_type_columns: &'a Vec<usize>,
4090            /// Having items aligned with the output columns
4091            /// representing which column in the input schema each output column depends on
4092            pub dependency_indices: &'a Vec<usize>,
4093            /// Options
4094            pub options: &'a UnnestOptions,
4095        }
4096        let comparable_self = ComparableUnnest {
4097            input: &self.input,
4098            exec_columns: &self.exec_columns,
4099            list_type_columns: &self.list_type_columns,
4100            struct_type_columns: &self.struct_type_columns,
4101            dependency_indices: &self.dependency_indices,
4102            options: &self.options,
4103        };
4104        let comparable_other = ComparableUnnest {
4105            input: &other.input,
4106            exec_columns: &other.exec_columns,
4107            list_type_columns: &other.list_type_columns,
4108            struct_type_columns: &other.struct_type_columns,
4109            dependency_indices: &other.dependency_indices,
4110            options: &other.options,
4111        };
4112        comparable_self
4113            .partial_cmp(&comparable_other)
4114            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4115            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4116    }
4117}
4118
4119impl Unnest {
4120    pub fn try_new(
4121        input: Arc<LogicalPlan>,
4122        exec_columns: Vec<Column>,
4123        options: UnnestOptions,
4124    ) -> Result<Self> {
4125        if exec_columns.is_empty() {
4126            return plan_err!("unnest plan requires at least 1 column to unnest");
4127        }
4128
4129        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4130        let mut struct_columns = vec![];
4131        let indices_to_unnest = exec_columns
4132            .iter()
4133            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4134            .collect::<Result<HashMap<usize, &Column>>>()?;
4135
4136        let input_schema = input.schema();
4137
4138        let mut dependency_indices = vec![];
4139        // Transform input schema into new schema
4140        // Given this comprehensive example
4141        //
4142        // input schema:
4143        // 1.col1_unnest_placeholder: list[list[int]],
4144        // 2.col1: list[list[int]]
4145        // 3.col2: list[int]
4146        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4147        // output schema:
4148        // 1.unnest_col1_depth_2: int
4149        // 2.unnest_col1_depth_1: list[int]
4150        // 3.col1: list[list[int]]
4151        // 4.unnest_col2_depth_1: int
4152        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4153        // the plural.
4154        let fields = input_schema
4155            .iter()
4156            .enumerate()
4157            .map(|(index, (original_qualifier, original_field))| {
4158                match indices_to_unnest.get(&index) {
4159                    Some(column_to_unnest) => {
4160                        let recursions_on_column = options
4161                            .recursions
4162                            .iter()
4163                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4164                            .collect::<Vec<_>>();
4165                        let mut transformed_columns = recursions_on_column
4166                            .iter()
4167                            .map(|r| {
4168                                list_columns.push((
4169                                    index,
4170                                    ColumnUnnestList {
4171                                        output_column: r.output_column.clone(),
4172                                        depth: r.depth,
4173                                    },
4174                                ));
4175                                Ok(get_unnested_columns(
4176                                    &r.output_column.name,
4177                                    original_field.data_type(),
4178                                    r.depth,
4179                                )?
4180                                .into_iter()
4181                                .next()
4182                                .unwrap()) // because unnesting a list column always result into one result
4183                            })
4184                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4185                        if transformed_columns.is_empty() {
4186                            transformed_columns = get_unnested_columns(
4187                                &column_to_unnest.name,
4188                                original_field.data_type(),
4189                                1,
4190                            )?;
4191                            match original_field.data_type() {
4192                                DataType::Struct(_) => {
4193                                    struct_columns.push(index);
4194                                }
4195                                DataType::List(_)
4196                                | DataType::FixedSizeList(_, _)
4197                                | DataType::LargeList(_) => {
4198                                    list_columns.push((
4199                                        index,
4200                                        ColumnUnnestList {
4201                                            output_column: Column::from_name(
4202                                                &column_to_unnest.name,
4203                                            ),
4204                                            depth: 1,
4205                                        },
4206                                    ));
4207                                }
4208                                _ => {}
4209                            };
4210                        }
4211
4212                        // new columns dependent on the same original index
4213                        dependency_indices.extend(std::iter::repeat_n(
4214                            index,
4215                            transformed_columns.len(),
4216                        ));
4217                        Ok(transformed_columns
4218                            .iter()
4219                            .map(|(col, field)| {
4220                                (col.relation.to_owned(), field.to_owned())
4221                            })
4222                            .collect())
4223                    }
4224                    None => {
4225                        dependency_indices.push(index);
4226                        Ok(vec![(
4227                            original_qualifier.cloned(),
4228                            Arc::clone(original_field),
4229                        )])
4230                    }
4231                }
4232            })
4233            .collect::<Result<Vec<_>>>()?
4234            .into_iter()
4235            .flatten()
4236            .collect::<Vec<_>>();
4237
4238        let metadata = input_schema.metadata().clone();
4239        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4240        // We can use the existing functional dependencies:
4241        let deps = input_schema.functional_dependencies().clone();
4242        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4243
4244        Ok(Unnest {
4245            input,
4246            exec_columns,
4247            list_type_columns: list_columns,
4248            struct_type_columns: struct_columns,
4249            dependency_indices,
4250            schema,
4251            options,
4252        })
4253    }
4254}
4255
4256// Based on data type, either struct or a variant of list
4257// return a set of columns as the result of unnesting
4258// the input columns.
4259// For example, given a column with name "a",
4260// - List(Element) returns ["a"] with data type Element
4261// - Struct(field1, field2) returns ["a.field1","a.field2"]
4262// For list data type, an argument depth is used to specify
4263// the recursion level
4264fn get_unnested_columns(
4265    col_name: &String,
4266    data_type: &DataType,
4267    depth: usize,
4268) -> Result<Vec<(Column, Arc<Field>)>> {
4269    let mut qualified_columns = Vec::with_capacity(1);
4270
4271    match data_type {
4272        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4273            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4274            let new_field = Arc::new(Field::new(
4275                col_name, data_type,
4276                // Unnesting may produce NULLs even if the list is not null.
4277                // For example: unnest([1], []) -> 1, null
4278                true,
4279            ));
4280            let column = Column::from_name(col_name);
4281            // let column = Column::from((None, &new_field));
4282            qualified_columns.push((column, new_field));
4283        }
4284        DataType::Struct(fields) => {
4285            qualified_columns.extend(fields.iter().map(|f| {
4286                let new_name = format!("{}.{}", col_name, f.name());
4287                let column = Column::from_name(&new_name);
4288                let new_field = f.as_ref().clone().with_name(new_name);
4289                // let column = Column::from((None, &f));
4290                (column, Arc::new(new_field))
4291            }))
4292        }
4293        _ => {
4294            return internal_err!("trying to unnest on invalid data type {data_type}");
4295        }
4296    };
4297    Ok(qualified_columns)
4298}
4299
4300// Get the data type of a multi-dimensional type after unnesting it
4301// with a given depth
4302fn get_unnested_list_datatype_recursive(
4303    data_type: &DataType,
4304    depth: usize,
4305) -> Result<DataType> {
4306    match data_type {
4307        DataType::List(field)
4308        | DataType::FixedSizeList(field, _)
4309        | DataType::LargeList(field) => {
4310            if depth == 1 {
4311                return Ok(field.data_type().clone());
4312            }
4313            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4314        }
4315        _ => {}
4316    };
4317
4318    internal_err!("trying to unnest on invalid data type {data_type}")
4319}
4320
4321#[cfg(test)]
4322mod tests {
4323    use super::*;
4324    use crate::builder::LogicalTableSource;
4325    use crate::logical_plan::table_scan;
4326    use crate::select_expr::SelectExpr;
4327    use crate::test::function_stub::{count, count_udaf};
4328    use crate::{
4329        GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4330        scalar_subquery,
4331    };
4332    use datafusion_common::metadata::ScalarAndMetadata;
4333    use datafusion_common::tree_node::{
4334        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4335    };
4336    use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4337    use insta::{assert_debug_snapshot, assert_snapshot};
4338    use std::hash::DefaultHasher;
4339
4340    fn employee_schema() -> Schema {
4341        Schema::new(vec![
4342            Field::new("id", DataType::Int32, false),
4343            Field::new("first_name", DataType::Utf8, false),
4344            Field::new("last_name", DataType::Utf8, false),
4345            Field::new("state", DataType::Utf8, false),
4346            Field::new("salary", DataType::Int32, false),
4347        ])
4348    }
4349
4350    fn display_plan() -> Result<LogicalPlan> {
4351        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4352            .build()?;
4353
4354        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4355            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4356            .project(vec![col("id")])?
4357            .build()
4358    }
4359
4360    #[test]
4361    fn test_display_indent() -> Result<()> {
4362        let plan = display_plan()?;
4363
4364        assert_snapshot!(plan.display_indent(), @r"
4365        Projection: employee_csv.id
4366          Filter: employee_csv.state IN (<subquery>)
4367            Subquery:
4368              TableScan: employee_csv projection=[state]
4369            TableScan: employee_csv projection=[id, state]
4370        ");
4371        Ok(())
4372    }
4373
4374    #[test]
4375    fn test_display_indent_schema() -> Result<()> {
4376        let plan = display_plan()?;
4377
4378        assert_snapshot!(plan.display_indent_schema(), @r"
4379        Projection: employee_csv.id [id:Int32]
4380          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4381            Subquery: [state:Utf8]
4382              TableScan: employee_csv projection=[state] [state:Utf8]
4383            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4384        ");
4385        Ok(())
4386    }
4387
4388    #[test]
4389    fn test_display_subquery_alias() -> Result<()> {
4390        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4391            .build()?;
4392        let plan1 = Arc::new(plan1);
4393
4394        let plan =
4395            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4396                .project(vec![col("id"), exists(plan1).alias("exists")])?
4397                .build();
4398
4399        assert_snapshot!(plan?.display_indent(), @r"
4400        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4401          Subquery:
4402            TableScan: employee_csv projection=[state]
4403          TableScan: employee_csv projection=[id, state]
4404        ");
4405        Ok(())
4406    }
4407
4408    #[test]
4409    fn test_display_graphviz() -> Result<()> {
4410        let plan = display_plan()?;
4411
4412        // just test for a few key lines in the output rather than the
4413        // whole thing to make test maintenance easier.
4414        assert_snapshot!(plan.display_graphviz(), @r#"
4415        // Begin DataFusion GraphViz Plan,
4416        // display it online here: https://dreampuf.github.io/GraphvizOnline
4417
4418        digraph {
4419          subgraph cluster_1
4420          {
4421            graph[label="LogicalPlan"]
4422            2[shape=box label="Projection: employee_csv.id"]
4423            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4424            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4425            4[shape=box label="Subquery:"]
4426            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4427            5[shape=box label="TableScan: employee_csv projection=[state]"]
4428            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4429            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4430            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4431          }
4432          subgraph cluster_7
4433          {
4434            graph[label="Detailed LogicalPlan"]
4435            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4436            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4437            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4438            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4439            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4440            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4441            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4442            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4443            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4444          }
4445        }
4446        // End DataFusion GraphViz Plan
4447        "#);
4448        Ok(())
4449    }
4450
4451    #[test]
4452    fn test_display_pg_json() -> Result<()> {
4453        let plan = display_plan()?;
4454
4455        assert_snapshot!(plan.display_pg_json(), @r#"
4456        [
4457          {
4458            "Plan": {
4459              "Expressions": [
4460                "employee_csv.id"
4461              ],
4462              "Node Type": "Projection",
4463              "Output": [
4464                "id"
4465              ],
4466              "Plans": [
4467                {
4468                  "Condition": "employee_csv.state IN (<subquery>)",
4469                  "Node Type": "Filter",
4470                  "Output": [
4471                    "id",
4472                    "state"
4473                  ],
4474                  "Plans": [
4475                    {
4476                      "Node Type": "Subquery",
4477                      "Output": [
4478                        "state"
4479                      ],
4480                      "Plans": [
4481                        {
4482                          "Node Type": "TableScan",
4483                          "Output": [
4484                            "state"
4485                          ],
4486                          "Plans": [],
4487                          "Relation Name": "employee_csv"
4488                        }
4489                      ]
4490                    },
4491                    {
4492                      "Node Type": "TableScan",
4493                      "Output": [
4494                        "id",
4495                        "state"
4496                      ],
4497                      "Plans": [],
4498                      "Relation Name": "employee_csv"
4499                    }
4500                  ]
4501                }
4502              ]
4503            }
4504          }
4505        ]
4506        "#);
4507        Ok(())
4508    }
4509
4510    /// Tests for the Visitor trait and walking logical plan nodes
4511    #[derive(Debug, Default)]
4512    struct OkVisitor {
4513        strings: Vec<String>,
4514    }
4515
4516    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4517        type Node = LogicalPlan;
4518
4519        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4520            let s = match plan {
4521                LogicalPlan::Projection { .. } => "pre_visit Projection",
4522                LogicalPlan::Filter { .. } => "pre_visit Filter",
4523                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4524                _ => {
4525                    return not_impl_err!("unknown plan type");
4526                }
4527            };
4528
4529            self.strings.push(s.into());
4530            Ok(TreeNodeRecursion::Continue)
4531        }
4532
4533        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4534            let s = match plan {
4535                LogicalPlan::Projection { .. } => "post_visit Projection",
4536                LogicalPlan::Filter { .. } => "post_visit Filter",
4537                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4538                _ => {
4539                    return not_impl_err!("unknown plan type");
4540                }
4541            };
4542
4543            self.strings.push(s.into());
4544            Ok(TreeNodeRecursion::Continue)
4545        }
4546    }
4547
4548    #[test]
4549    fn visit_order() {
4550        let mut visitor = OkVisitor::default();
4551        let plan = test_plan();
4552        let res = plan.visit_with_subqueries(&mut visitor);
4553        assert!(res.is_ok());
4554
4555        assert_debug_snapshot!(visitor.strings, @r#"
4556        [
4557            "pre_visit Projection",
4558            "pre_visit Filter",
4559            "pre_visit TableScan",
4560            "post_visit TableScan",
4561            "post_visit Filter",
4562            "post_visit Projection",
4563        ]
4564        "#);
4565    }
4566
4567    #[derive(Debug, Default)]
4568    /// Counter than counts to zero and returns true when it gets there
4569    struct OptionalCounter {
4570        val: Option<usize>,
4571    }
4572
4573    impl OptionalCounter {
4574        fn new(val: usize) -> Self {
4575            Self { val: Some(val) }
4576        }
4577        // Decrements the counter by 1, if any, returning true if it hits zero
4578        fn dec(&mut self) -> bool {
4579            if Some(0) == self.val {
4580                true
4581            } else {
4582                self.val = self.val.take().map(|i| i - 1);
4583                false
4584            }
4585        }
4586    }
4587
4588    #[derive(Debug, Default)]
4589    /// Visitor that returns false after some number of visits
4590    struct StoppingVisitor {
4591        inner: OkVisitor,
4592        /// When Some(0) returns false from pre_visit
4593        return_false_from_pre_in: OptionalCounter,
4594        /// When Some(0) returns false from post_visit
4595        return_false_from_post_in: OptionalCounter,
4596    }
4597
4598    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4599        type Node = LogicalPlan;
4600
4601        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4602            if self.return_false_from_pre_in.dec() {
4603                return Ok(TreeNodeRecursion::Stop);
4604            }
4605            self.inner.f_down(plan)?;
4606
4607            Ok(TreeNodeRecursion::Continue)
4608        }
4609
4610        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4611            if self.return_false_from_post_in.dec() {
4612                return Ok(TreeNodeRecursion::Stop);
4613            }
4614
4615            self.inner.f_up(plan)
4616        }
4617    }
4618
4619    /// test early stopping in pre-visit
4620    #[test]
4621    fn early_stopping_pre_visit() {
4622        let mut visitor = StoppingVisitor {
4623            return_false_from_pre_in: OptionalCounter::new(2),
4624            ..Default::default()
4625        };
4626        let plan = test_plan();
4627        let res = plan.visit_with_subqueries(&mut visitor);
4628        assert!(res.is_ok());
4629
4630        assert_debug_snapshot!(
4631            visitor.inner.strings,
4632            @r#"
4633        [
4634            "pre_visit Projection",
4635            "pre_visit Filter",
4636        ]
4637        "#
4638        );
4639    }
4640
4641    #[test]
4642    fn early_stopping_post_visit() {
4643        let mut visitor = StoppingVisitor {
4644            return_false_from_post_in: OptionalCounter::new(1),
4645            ..Default::default()
4646        };
4647        let plan = test_plan();
4648        let res = plan.visit_with_subqueries(&mut visitor);
4649        assert!(res.is_ok());
4650
4651        assert_debug_snapshot!(
4652            visitor.inner.strings,
4653            @r#"
4654        [
4655            "pre_visit Projection",
4656            "pre_visit Filter",
4657            "pre_visit TableScan",
4658            "post_visit TableScan",
4659        ]
4660        "#
4661        );
4662    }
4663
4664    #[derive(Debug, Default)]
4665    /// Visitor that returns an error after some number of visits
4666    struct ErrorVisitor {
4667        inner: OkVisitor,
4668        /// When Some(0) returns false from pre_visit
4669        return_error_from_pre_in: OptionalCounter,
4670        /// When Some(0) returns false from post_visit
4671        return_error_from_post_in: OptionalCounter,
4672    }
4673
4674    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4675        type Node = LogicalPlan;
4676
4677        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4678            if self.return_error_from_pre_in.dec() {
4679                return not_impl_err!("Error in pre_visit");
4680            }
4681
4682            self.inner.f_down(plan)
4683        }
4684
4685        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4686            if self.return_error_from_post_in.dec() {
4687                return not_impl_err!("Error in post_visit");
4688            }
4689
4690            self.inner.f_up(plan)
4691        }
4692    }
4693
4694    #[test]
4695    fn error_pre_visit() {
4696        let mut visitor = ErrorVisitor {
4697            return_error_from_pre_in: OptionalCounter::new(2),
4698            ..Default::default()
4699        };
4700        let plan = test_plan();
4701        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4702        assert_snapshot!(
4703            res.strip_backtrace(),
4704            @"This feature is not implemented: Error in pre_visit"
4705        );
4706        assert_debug_snapshot!(
4707            visitor.inner.strings,
4708            @r#"
4709        [
4710            "pre_visit Projection",
4711            "pre_visit Filter",
4712        ]
4713        "#
4714        );
4715    }
4716
4717    #[test]
4718    fn error_post_visit() {
4719        let mut visitor = ErrorVisitor {
4720            return_error_from_post_in: OptionalCounter::new(1),
4721            ..Default::default()
4722        };
4723        let plan = test_plan();
4724        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4725        assert_snapshot!(
4726            res.strip_backtrace(),
4727            @"This feature is not implemented: Error in post_visit"
4728        );
4729        assert_debug_snapshot!(
4730            visitor.inner.strings,
4731            @r#"
4732        [
4733            "pre_visit Projection",
4734            "pre_visit Filter",
4735            "pre_visit TableScan",
4736            "post_visit TableScan",
4737        ]
4738        "#
4739        );
4740    }
4741
4742    #[test]
4743    fn test_partial_eq_hash_and_partial_ord() {
4744        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4745            produce_one_row: true,
4746            schema: Arc::new(DFSchema::empty()),
4747        }));
4748
4749        let count_window_function = |schema| {
4750            Window::try_new_with_schema(
4751                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4752                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4753                    vec![],
4754                )))],
4755                Arc::clone(&empty_values),
4756                Arc::new(schema),
4757            )
4758            .unwrap()
4759        };
4760
4761        let schema_without_metadata = || {
4762            DFSchema::from_unqualified_fields(
4763                vec![Field::new("count", DataType::Int64, false)].into(),
4764                HashMap::new(),
4765            )
4766            .unwrap()
4767        };
4768
4769        let schema_with_metadata = || {
4770            DFSchema::from_unqualified_fields(
4771                vec![Field::new("count", DataType::Int64, false)].into(),
4772                [("key".to_string(), "value".to_string())].into(),
4773            )
4774            .unwrap()
4775        };
4776
4777        // A Window
4778        let f = count_window_function(schema_without_metadata());
4779
4780        // Same like `f`, different instance
4781        let f2 = count_window_function(schema_without_metadata());
4782        assert_eq!(f, f2);
4783        assert_eq!(hash(&f), hash(&f2));
4784        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4785
4786        // Same like `f`, except for schema metadata
4787        let o = count_window_function(schema_with_metadata());
4788        assert_ne!(f, o);
4789        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4790        assert_eq!(f.partial_cmp(&o), None);
4791    }
4792
4793    fn hash<T: Hash>(value: &T) -> u64 {
4794        let hasher = &mut DefaultHasher::new();
4795        value.hash(hasher);
4796        hasher.finish()
4797    }
4798
4799    #[test]
4800    fn projection_expr_schema_mismatch() -> Result<()> {
4801        let empty_schema = Arc::new(DFSchema::empty());
4802        let p = Projection::try_new_with_schema(
4803            vec![col("a")],
4804            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4805                produce_one_row: false,
4806                schema: Arc::clone(&empty_schema),
4807            })),
4808            empty_schema,
4809        );
4810        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4811        Ok(())
4812    }
4813
4814    fn test_plan() -> LogicalPlan {
4815        let schema = Schema::new(vec![
4816            Field::new("id", DataType::Int32, false),
4817            Field::new("state", DataType::Utf8, false),
4818        ]);
4819
4820        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4821            .unwrap()
4822            .filter(col("state").eq(lit("CO")))
4823            .unwrap()
4824            .project(vec![col("id")])
4825            .unwrap()
4826            .build()
4827            .unwrap()
4828    }
4829
4830    #[test]
4831    fn test_replace_invalid_placeholder() {
4832        // test empty placeholder
4833        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4834
4835        let plan = table_scan(TableReference::none(), &schema, None)
4836            .unwrap()
4837            .filter(col("id").eq(placeholder("")))
4838            .unwrap()
4839            .build()
4840            .unwrap();
4841
4842        let param_values = vec![ScalarValue::Int32(Some(42))];
4843        plan.replace_params_with_values(&param_values.clone().into())
4844            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4845
4846        // test $0 placeholder
4847        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4848
4849        let plan = table_scan(TableReference::none(), &schema, None)
4850            .unwrap()
4851            .filter(col("id").eq(placeholder("$0")))
4852            .unwrap()
4853            .build()
4854            .unwrap();
4855
4856        plan.replace_params_with_values(&param_values.clone().into())
4857            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4858
4859        // test $00 placeholder
4860        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4861
4862        let plan = table_scan(TableReference::none(), &schema, None)
4863            .unwrap()
4864            .filter(col("id").eq(placeholder("$00")))
4865            .unwrap()
4866            .build()
4867            .unwrap();
4868
4869        plan.replace_params_with_values(&param_values.into())
4870            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4871    }
4872
4873    #[test]
4874    fn test_replace_placeholder_mismatched_metadata() {
4875        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4876
4877        // Create a prepared statement with explicit fields that do not have metadata
4878        let plan = table_scan(TableReference::none(), &schema, None)
4879            .unwrap()
4880            .filter(col("id").eq(placeholder("$1")))
4881            .unwrap()
4882            .build()
4883            .unwrap();
4884        let prepared_builder = LogicalPlanBuilder::new(plan)
4885            .prepare(
4886                "".to_string(),
4887                vec![Field::new("", DataType::Int32, true).into()],
4888            )
4889            .unwrap();
4890
4891        // Attempt to bind a parameter with metadata
4892        let mut scalar_meta = HashMap::new();
4893        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4894        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4895            ScalarValue::Int32(Some(42)),
4896            Some(scalar_meta.into()),
4897        )]);
4898        prepared_builder
4899            .plan()
4900            .clone()
4901            .with_param_values(param_values)
4902            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4903    }
4904
4905    #[test]
4906    fn test_replace_placeholder_empty_relation_valid_schema() {
4907        // SELECT $1, $2;
4908        let plan = LogicalPlanBuilder::empty(false)
4909            .project(vec![
4910                SelectExpr::from(placeholder("$1")),
4911                SelectExpr::from(placeholder("$2")),
4912            ])
4913            .unwrap()
4914            .build()
4915            .unwrap();
4916
4917        // original
4918        assert_snapshot!(plan.display_indent_schema(), @r"
4919        Projection: $1, $2 [$1:Null;N, $2:Null;N]
4920          EmptyRelation: rows=0 []
4921        ");
4922
4923        let plan = plan
4924            .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4925            .unwrap();
4926
4927        // replaced
4928        assert_snapshot!(plan.display_indent_schema(), @r#"
4929        Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4930          EmptyRelation: rows=0 []
4931        "#);
4932    }
4933
4934    #[test]
4935    fn test_nullable_schema_after_grouping_set() {
4936        let schema = Schema::new(vec![
4937            Field::new("foo", DataType::Int32, false),
4938            Field::new("bar", DataType::Int32, false),
4939        ]);
4940
4941        let plan = table_scan(TableReference::none(), &schema, None)
4942            .unwrap()
4943            .aggregate(
4944                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4945                    vec![col("foo")],
4946                    vec![col("bar")],
4947                ]))],
4948                vec![count(lit(true))],
4949            )
4950            .unwrap()
4951            .build()
4952            .unwrap();
4953
4954        let output_schema = plan.schema();
4955
4956        assert!(
4957            output_schema
4958                .field_with_name(None, "foo")
4959                .unwrap()
4960                .is_nullable(),
4961        );
4962        assert!(
4963            output_schema
4964                .field_with_name(None, "bar")
4965                .unwrap()
4966                .is_nullable()
4967        );
4968    }
4969
4970    #[test]
4971    fn test_filter_is_scalar() {
4972        // test empty placeholder
4973        let schema =
4974            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4975
4976        let source = Arc::new(LogicalTableSource::new(schema));
4977        let schema = Arc::new(
4978            DFSchema::try_from_qualified_schema(
4979                TableReference::bare("tab"),
4980                &source.schema(),
4981            )
4982            .unwrap(),
4983        );
4984        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4985            table_name: TableReference::bare("tab"),
4986            source: Arc::clone(&source) as Arc<dyn TableSource>,
4987            projection: None,
4988            projected_schema: Arc::clone(&schema),
4989            filters: vec![],
4990            fetch: None,
4991        }));
4992        let col = schema.field_names()[0].clone();
4993
4994        let filter = Filter::try_new(
4995            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4996            scan,
4997        )
4998        .unwrap();
4999        assert!(!filter.is_scalar());
5000        let unique_schema = Arc::new(
5001            schema
5002                .as_ref()
5003                .clone()
5004                .with_functional_dependencies(
5005                    FunctionalDependencies::new_from_constraints(
5006                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
5007                            vec![0],
5008                        )])),
5009                        1,
5010                    ),
5011                )
5012                .unwrap(),
5013        );
5014        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5015            table_name: TableReference::bare("tab"),
5016            source,
5017            projection: None,
5018            projected_schema: Arc::clone(&unique_schema),
5019            filters: vec![],
5020            fetch: None,
5021        }));
5022        let col = schema.field_names()[0].clone();
5023
5024        let filter =
5025            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5026        assert!(filter.is_scalar());
5027    }
5028
5029    #[test]
5030    fn test_transform_explain() {
5031        let schema = Schema::new(vec![
5032            Field::new("foo", DataType::Int32, false),
5033            Field::new("bar", DataType::Int32, false),
5034        ]);
5035
5036        let plan = table_scan(TableReference::none(), &schema, None)
5037            .unwrap()
5038            .explain(false, false)
5039            .unwrap()
5040            .build()
5041            .unwrap();
5042
5043        let external_filter = col("foo").eq(lit(true));
5044
5045        // after transformation, because plan is not the same anymore,
5046        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
5047        let plan = plan
5048            .transform(|plan| match plan {
5049                LogicalPlan::TableScan(table) => {
5050                    let filter = Filter::try_new(
5051                        external_filter.clone(),
5052                        Arc::new(LogicalPlan::TableScan(table)),
5053                    )
5054                    .unwrap();
5055                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5056                }
5057                x => Ok(Transformed::no(x)),
5058            })
5059            .data()
5060            .unwrap();
5061
5062        let actual = format!("{}", plan.display_indent());
5063        assert_snapshot!(actual, @r"
5064        Explain
5065          Filter: foo = Boolean(true)
5066            TableScan: ?table?
5067        ")
5068    }
5069
5070    #[test]
5071    fn test_plan_partial_ord() {
5072        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5073            produce_one_row: false,
5074            schema: Arc::new(DFSchema::empty()),
5075        });
5076
5077        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5078            schema: Arc::new(Schema::new(vec![Field::new(
5079                "foo",
5080                DataType::Int32,
5081                false,
5082            )])),
5083            output_schema: DFSchemaRef::new(DFSchema::empty()),
5084        });
5085
5086        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5087            schema: Arc::new(Schema::new(vec![Field::new(
5088                "foo",
5089                DataType::Int32,
5090                false,
5091            )])),
5092            output_schema: DFSchemaRef::new(DFSchema::empty()),
5093        });
5094
5095        assert_eq!(
5096            empty_relation.partial_cmp(&describe_table),
5097            Some(Ordering::Less)
5098        );
5099        assert_eq!(
5100            describe_table.partial_cmp(&empty_relation),
5101            Some(Ordering::Greater)
5102        );
5103        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5104    }
5105
5106    #[test]
5107    fn test_limit_with_new_children() {
5108        let input = Arc::new(LogicalPlan::Values(Values {
5109            schema: Arc::new(DFSchema::empty()),
5110            values: vec![vec![]],
5111        }));
5112        let cases = [
5113            LogicalPlan::Limit(Limit {
5114                skip: None,
5115                fetch: None,
5116                input: Arc::clone(&input),
5117            }),
5118            LogicalPlan::Limit(Limit {
5119                skip: None,
5120                fetch: Some(Box::new(Expr::Literal(
5121                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5122                    None,
5123                ))),
5124                input: Arc::clone(&input),
5125            }),
5126            LogicalPlan::Limit(Limit {
5127                skip: Some(Box::new(Expr::Literal(
5128                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5129                    None,
5130                ))),
5131                fetch: None,
5132                input: Arc::clone(&input),
5133            }),
5134            LogicalPlan::Limit(Limit {
5135                skip: Some(Box::new(Expr::Literal(
5136                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5137                    None,
5138                ))),
5139                fetch: Some(Box::new(Expr::Literal(
5140                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5141                    None,
5142                ))),
5143                input,
5144            }),
5145        ];
5146
5147        for limit in cases {
5148            let new_limit = limit
5149                .with_new_exprs(
5150                    limit.expressions(),
5151                    limit.inputs().into_iter().cloned().collect(),
5152                )
5153                .unwrap();
5154            assert_eq!(limit, new_limit);
5155        }
5156    }
5157
5158    #[test]
5159    fn test_with_subqueries_jump() {
5160        // The test plan contains a `Project` node above a `Filter` node, and the
5161        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5162        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5163        // `Filter`s.
5164        let subquery_schema =
5165            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5166
5167        let subquery_plan =
5168            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5169                .unwrap()
5170                .filter(col("sub_id").eq(lit(0)))
5171                .unwrap()
5172                .build()
5173                .unwrap();
5174
5175        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5176
5177        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5178            .unwrap()
5179            .filter(col("id").eq(lit(0)))
5180            .unwrap()
5181            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5182            .unwrap()
5183            .build()
5184            .unwrap();
5185
5186        let mut filter_found = false;
5187        plan.apply_with_subqueries(|plan| {
5188            match plan {
5189                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5190                LogicalPlan::Filter(..) => filter_found = true,
5191                _ => {}
5192            }
5193            Ok(TreeNodeRecursion::Continue)
5194        })
5195        .unwrap();
5196        assert!(!filter_found);
5197
5198        struct ProjectJumpVisitor {
5199            filter_found: bool,
5200        }
5201
5202        impl ProjectJumpVisitor {
5203            fn new() -> Self {
5204                Self {
5205                    filter_found: false,
5206                }
5207            }
5208        }
5209
5210        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5211            type Node = LogicalPlan;
5212
5213            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5214                match node {
5215                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5216                    LogicalPlan::Filter(..) => self.filter_found = true,
5217                    _ => {}
5218                }
5219                Ok(TreeNodeRecursion::Continue)
5220            }
5221        }
5222
5223        let mut visitor = ProjectJumpVisitor::new();
5224        plan.visit_with_subqueries(&mut visitor).unwrap();
5225        assert!(!visitor.filter_found);
5226
5227        let mut filter_found = false;
5228        plan.clone()
5229            .transform_down_with_subqueries(|plan| {
5230                match plan {
5231                    LogicalPlan::Projection(..) => {
5232                        return Ok(Transformed::new(
5233                            plan,
5234                            false,
5235                            TreeNodeRecursion::Jump,
5236                        ));
5237                    }
5238                    LogicalPlan::Filter(..) => filter_found = true,
5239                    _ => {}
5240                }
5241                Ok(Transformed::no(plan))
5242            })
5243            .unwrap();
5244        assert!(!filter_found);
5245
5246        let mut filter_found = false;
5247        plan.clone()
5248            .transform_down_up_with_subqueries(
5249                |plan| {
5250                    match plan {
5251                        LogicalPlan::Projection(..) => {
5252                            return Ok(Transformed::new(
5253                                plan,
5254                                false,
5255                                TreeNodeRecursion::Jump,
5256                            ));
5257                        }
5258                        LogicalPlan::Filter(..) => filter_found = true,
5259                        _ => {}
5260                    }
5261                    Ok(Transformed::no(plan))
5262                },
5263                |plan| Ok(Transformed::no(plan)),
5264            )
5265            .unwrap();
5266        assert!(!filter_found);
5267
5268        struct ProjectJumpRewriter {
5269            filter_found: bool,
5270        }
5271
5272        impl ProjectJumpRewriter {
5273            fn new() -> Self {
5274                Self {
5275                    filter_found: false,
5276                }
5277            }
5278        }
5279
5280        impl TreeNodeRewriter for ProjectJumpRewriter {
5281            type Node = LogicalPlan;
5282
5283            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5284                match node {
5285                    LogicalPlan::Projection(..) => {
5286                        return Ok(Transformed::new(
5287                            node,
5288                            false,
5289                            TreeNodeRecursion::Jump,
5290                        ));
5291                    }
5292                    LogicalPlan::Filter(..) => self.filter_found = true,
5293                    _ => {}
5294                }
5295                Ok(Transformed::no(node))
5296            }
5297        }
5298
5299        let mut rewriter = ProjectJumpRewriter::new();
5300        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5301        assert!(!rewriter.filter_found);
5302    }
5303
5304    #[test]
5305    fn test_with_unresolved_placeholders() {
5306        let field_name = "id";
5307        let placeholder_value = "$1";
5308        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5309
5310        let plan = table_scan(TableReference::none(), &schema, None)
5311            .unwrap()
5312            .filter(col(field_name).eq(placeholder(placeholder_value)))
5313            .unwrap()
5314            .build()
5315            .unwrap();
5316
5317        // Check that the placeholder parameters have not received a DataType.
5318        let params = plan.get_parameter_fields().unwrap();
5319        assert_eq!(params.len(), 1);
5320
5321        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5322        assert_eq!(parameter_type, None);
5323    }
5324
5325    #[test]
5326    fn test_join_with_new_exprs() -> Result<()> {
5327        fn create_test_join(
5328            on: Vec<(Expr, Expr)>,
5329            filter: Option<Expr>,
5330        ) -> Result<LogicalPlan> {
5331            let schema = Schema::new(vec![
5332                Field::new("a", DataType::Int32, false),
5333                Field::new("b", DataType::Int32, false),
5334            ]);
5335
5336            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5337            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5338
5339            Ok(LogicalPlan::Join(Join {
5340                left: Arc::new(
5341                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5342                ),
5343                right: Arc::new(
5344                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5345                ),
5346                on,
5347                filter,
5348                join_type: JoinType::Inner,
5349                join_constraint: JoinConstraint::On,
5350                schema: Arc::new(left_schema.join(&right_schema)?),
5351                null_equality: NullEquality::NullEqualsNothing,
5352                null_aware: false,
5353            }))
5354        }
5355
5356        {
5357            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5358            let LogicalPlan::Join(join) = join.with_new_exprs(
5359                join.expressions(),
5360                join.inputs().into_iter().cloned().collect(),
5361            )?
5362            else {
5363                unreachable!()
5364            };
5365            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5366            assert_eq!(join.filter, None);
5367        }
5368
5369        {
5370            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5371            let LogicalPlan::Join(join) = join.with_new_exprs(
5372                join.expressions(),
5373                join.inputs().into_iter().cloned().collect(),
5374            )?
5375            else {
5376                unreachable!()
5377            };
5378            assert_eq!(join.on, vec![]);
5379            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5380        }
5381
5382        {
5383            let join = create_test_join(
5384                vec![(col("t1.a"), (col("t2.a")))],
5385                Some(col("t1.b").gt(col("t2.b"))),
5386            )?;
5387            let LogicalPlan::Join(join) = join.with_new_exprs(
5388                join.expressions(),
5389                join.inputs().into_iter().cloned().collect(),
5390            )?
5391            else {
5392                unreachable!()
5393            };
5394            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5395            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5396        }
5397
5398        {
5399            let join = create_test_join(
5400                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5401                None,
5402            )?;
5403            let LogicalPlan::Join(join) = join.with_new_exprs(
5404                vec![
5405                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5406                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5407                    col("t1.b"),
5408                    col("t2.b"),
5409                    lit(true),
5410                ],
5411                join.inputs().into_iter().cloned().collect(),
5412            )?
5413            else {
5414                unreachable!()
5415            };
5416            assert_eq!(
5417                join.on,
5418                vec![
5419                    (
5420                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5421                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5422                    ),
5423                    (col("t1.b"), (col("t2.b")))
5424                ]
5425            );
5426            assert_eq!(join.filter, Some(lit(true)));
5427        }
5428
5429        Ok(())
5430    }
5431
5432    #[test]
5433    fn test_join_try_new() -> Result<()> {
5434        let schema = Schema::new(vec![
5435            Field::new("a", DataType::Int32, false),
5436            Field::new("b", DataType::Int32, false),
5437        ]);
5438
5439        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5440
5441        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5442
5443        let join_types = vec![
5444            JoinType::Inner,
5445            JoinType::Left,
5446            JoinType::Right,
5447            JoinType::Full,
5448            JoinType::LeftSemi,
5449            JoinType::LeftAnti,
5450            JoinType::RightSemi,
5451            JoinType::RightAnti,
5452            JoinType::LeftMark,
5453        ];
5454
5455        for join_type in join_types {
5456            let join = Join::try_new(
5457                Arc::new(left_scan.clone()),
5458                Arc::new(right_scan.clone()),
5459                vec![(col("t1.a"), col("t2.a"))],
5460                Some(col("t1.b").gt(col("t2.b"))),
5461                join_type,
5462                JoinConstraint::On,
5463                NullEquality::NullEqualsNothing,
5464                false,
5465            )?;
5466
5467            match join_type {
5468                JoinType::LeftSemi | JoinType::LeftAnti => {
5469                    assert_eq!(join.schema.fields().len(), 2);
5470
5471                    let fields = join.schema.fields();
5472                    assert_eq!(
5473                        fields[0].name(),
5474                        "a",
5475                        "First field should be 'a' from left table"
5476                    );
5477                    assert_eq!(
5478                        fields[1].name(),
5479                        "b",
5480                        "Second field should be 'b' from left table"
5481                    );
5482                }
5483                JoinType::RightSemi | JoinType::RightAnti => {
5484                    assert_eq!(join.schema.fields().len(), 2);
5485
5486                    let fields = join.schema.fields();
5487                    assert_eq!(
5488                        fields[0].name(),
5489                        "a",
5490                        "First field should be 'a' from right table"
5491                    );
5492                    assert_eq!(
5493                        fields[1].name(),
5494                        "b",
5495                        "Second field should be 'b' from right table"
5496                    );
5497                }
5498                JoinType::LeftMark => {
5499                    assert_eq!(join.schema.fields().len(), 3);
5500
5501                    let fields = join.schema.fields();
5502                    assert_eq!(
5503                        fields[0].name(),
5504                        "a",
5505                        "First field should be 'a' from left table"
5506                    );
5507                    assert_eq!(
5508                        fields[1].name(),
5509                        "b",
5510                        "Second field should be 'b' from left table"
5511                    );
5512                    assert_eq!(
5513                        fields[2].name(),
5514                        "mark",
5515                        "Third field should be the mark column"
5516                    );
5517
5518                    assert!(!fields[0].is_nullable());
5519                    assert!(!fields[1].is_nullable());
5520                    assert!(!fields[2].is_nullable());
5521                }
5522                _ => {
5523                    assert_eq!(join.schema.fields().len(), 4);
5524
5525                    let fields = join.schema.fields();
5526                    assert_eq!(
5527                        fields[0].name(),
5528                        "a",
5529                        "First field should be 'a' from left table"
5530                    );
5531                    assert_eq!(
5532                        fields[1].name(),
5533                        "b",
5534                        "Second field should be 'b' from left table"
5535                    );
5536                    assert_eq!(
5537                        fields[2].name(),
5538                        "a",
5539                        "Third field should be 'a' from right table"
5540                    );
5541                    assert_eq!(
5542                        fields[3].name(),
5543                        "b",
5544                        "Fourth field should be 'b' from right table"
5545                    );
5546
5547                    if join_type == JoinType::Left {
5548                        // Left side fields (first two) shouldn't be nullable
5549                        assert!(!fields[0].is_nullable());
5550                        assert!(!fields[1].is_nullable());
5551                        // Right side fields (third and fourth) should be nullable
5552                        assert!(fields[2].is_nullable());
5553                        assert!(fields[3].is_nullable());
5554                    } else if join_type == JoinType::Right {
5555                        // Left side fields (first two) should be nullable
5556                        assert!(fields[0].is_nullable());
5557                        assert!(fields[1].is_nullable());
5558                        // Right side fields (third and fourth) shouldn't be nullable
5559                        assert!(!fields[2].is_nullable());
5560                        assert!(!fields[3].is_nullable());
5561                    } else if join_type == JoinType::Full {
5562                        assert!(fields[0].is_nullable());
5563                        assert!(fields[1].is_nullable());
5564                        assert!(fields[2].is_nullable());
5565                        assert!(fields[3].is_nullable());
5566                    }
5567                }
5568            }
5569
5570            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5571            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5572            assert_eq!(join.join_type, join_type);
5573            assert_eq!(join.join_constraint, JoinConstraint::On);
5574            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5575        }
5576
5577        Ok(())
5578    }
5579
5580    #[test]
5581    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5582        let left_schema = Schema::new(vec![
5583            Field::new("id", DataType::Int32, false), // Common column in both tables
5584            Field::new("name", DataType::Utf8, false), // Unique to left
5585            Field::new("value", DataType::Int32, false), // Common column, different meaning
5586        ]);
5587
5588        let right_schema = Schema::new(vec![
5589            Field::new("id", DataType::Int32, false), // Common column in both tables
5590            Field::new("category", DataType::Utf8, false), // Unique to right
5591            Field::new("value", DataType::Float64, true), // Common column, different meaning
5592        ]);
5593
5594        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5595
5596        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5597
5598        // Test 1: USING constraint with a common column
5599        {
5600            // In the logical plan, both copies of the `id` column are preserved
5601            // The USING constraint is handled later during physical execution, where the common column appears once
5602            let join = Join::try_new(
5603                Arc::new(left_plan.clone()),
5604                Arc::new(right_plan.clone()),
5605                vec![(col("t1.id"), col("t2.id"))],
5606                None,
5607                JoinType::Inner,
5608                JoinConstraint::Using,
5609                NullEquality::NullEqualsNothing,
5610                false,
5611            )?;
5612
5613            let fields = join.schema.fields();
5614
5615            assert_eq!(fields.len(), 6);
5616
5617            assert_eq!(
5618                fields[0].name(),
5619                "id",
5620                "First field should be 'id' from left table"
5621            );
5622            assert_eq!(
5623                fields[1].name(),
5624                "name",
5625                "Second field should be 'name' from left table"
5626            );
5627            assert_eq!(
5628                fields[2].name(),
5629                "value",
5630                "Third field should be 'value' from left table"
5631            );
5632            assert_eq!(
5633                fields[3].name(),
5634                "id",
5635                "Fourth field should be 'id' from right table"
5636            );
5637            assert_eq!(
5638                fields[4].name(),
5639                "category",
5640                "Fifth field should be 'category' from right table"
5641            );
5642            assert_eq!(
5643                fields[5].name(),
5644                "value",
5645                "Sixth field should be 'value' from right table"
5646            );
5647
5648            assert_eq!(join.join_constraint, JoinConstraint::Using);
5649        }
5650
5651        // Test 2: Complex join condition with expressions
5652        {
5653            // Complex condition: join on id equality AND where left.value < right.value
5654            let join = Join::try_new(
5655                Arc::new(left_plan.clone()),
5656                Arc::new(right_plan.clone()),
5657                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5658                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5659                JoinType::Inner,
5660                JoinConstraint::On,
5661                NullEquality::NullEqualsNothing,
5662                false,
5663            )?;
5664
5665            let fields = join.schema.fields();
5666            assert_eq!(fields.len(), 6);
5667
5668            assert_eq!(
5669                fields[0].name(),
5670                "id",
5671                "First field should be 'id' from left table"
5672            );
5673            assert_eq!(
5674                fields[1].name(),
5675                "name",
5676                "Second field should be 'name' from left table"
5677            );
5678            assert_eq!(
5679                fields[2].name(),
5680                "value",
5681                "Third field should be 'value' from left table"
5682            );
5683            assert_eq!(
5684                fields[3].name(),
5685                "id",
5686                "Fourth field should be 'id' from right table"
5687            );
5688            assert_eq!(
5689                fields[4].name(),
5690                "category",
5691                "Fifth field should be 'category' from right table"
5692            );
5693            assert_eq!(
5694                fields[5].name(),
5695                "value",
5696                "Sixth field should be 'value' from right table"
5697            );
5698
5699            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5700        }
5701
5702        // Test 3: Join with null equality behavior set to true
5703        {
5704            let join = Join::try_new(
5705                Arc::new(left_plan.clone()),
5706                Arc::new(right_plan.clone()),
5707                vec![(col("t1.id"), col("t2.id"))],
5708                None,
5709                JoinType::Inner,
5710                JoinConstraint::On,
5711                NullEquality::NullEqualsNull,
5712                false,
5713            )?;
5714
5715            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5716        }
5717
5718        Ok(())
5719    }
5720
5721    #[test]
5722    fn test_join_try_new_schema_validation() -> Result<()> {
5723        let left_schema = Schema::new(vec![
5724            Field::new("id", DataType::Int32, false),
5725            Field::new("name", DataType::Utf8, false),
5726            Field::new("value", DataType::Float64, true),
5727        ]);
5728
5729        let right_schema = Schema::new(vec![
5730            Field::new("id", DataType::Int32, false),
5731            Field::new("category", DataType::Utf8, true),
5732            Field::new("code", DataType::Int16, false),
5733        ]);
5734
5735        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5736
5737        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5738
5739        let join_types = vec![
5740            JoinType::Inner,
5741            JoinType::Left,
5742            JoinType::Right,
5743            JoinType::Full,
5744        ];
5745
5746        for join_type in join_types {
5747            let join = Join::try_new(
5748                Arc::new(left_plan.clone()),
5749                Arc::new(right_plan.clone()),
5750                vec![(col("t1.id"), col("t2.id"))],
5751                Some(col("t1.value").gt(lit(5.0))),
5752                join_type,
5753                JoinConstraint::On,
5754                NullEquality::NullEqualsNothing,
5755                false,
5756            )?;
5757
5758            let fields = join.schema.fields();
5759            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5760
5761            for (i, field) in fields.iter().enumerate() {
5762                let expected_nullable = match (i, &join_type) {
5763                    // Left table fields (indices 0, 1, 2)
5764                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5765                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5766                    (2, _) => true, // value is already nullable
5767
5768                    // Right table fields (indices 3, 4, 5)
5769                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5770                    (4, _) => true, // category is already nullable
5771                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5772
5773                    _ => false,
5774                };
5775
5776                assert_eq!(
5777                    field.is_nullable(),
5778                    expected_nullable,
5779                    "Field {} ({}) nullability incorrect for {:?} join",
5780                    i,
5781                    field.name(),
5782                    join_type
5783                );
5784            }
5785        }
5786
5787        let using_join = Join::try_new(
5788            Arc::new(left_plan.clone()),
5789            Arc::new(right_plan.clone()),
5790            vec![(col("t1.id"), col("t2.id"))],
5791            None,
5792            JoinType::Inner,
5793            JoinConstraint::Using,
5794            NullEquality::NullEqualsNothing,
5795            false,
5796        )?;
5797
5798        assert_eq!(
5799            using_join.schema.fields().len(),
5800            6,
5801            "USING join should have all fields"
5802        );
5803        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5804
5805        Ok(())
5806    }
5807}