datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29    assert_always_invariants_at_current_node, assert_executable_invariants,
30    InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{unique_field_aliases, unnest_with_options};
34use crate::expr::{
35    intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction,
36    WindowFunctionParams,
37};
38use crate::expr_rewriter::{
39    create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
40};
41use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
42use crate::logical_plan::extension::UserDefinedLogicalNode;
43use crate::logical_plan::{DmlStatement, Statement};
44use crate::utils::{
45    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
46    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
47};
48use crate::{
49    build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
50    CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
51    Operator, Prepare, TableProviderFilterPushDown, TableSource,
52    WindowFunctionDefinition,
53};
54
55use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
56use datafusion_common::cse::{NormalizeEq, Normalizeable};
57use datafusion_common::tree_node::{
58    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61    aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
62    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
63    FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
64    TableReference, UnnestOptions,
65};
66use indexmap::IndexSet;
67
68// backwards compatibility
69use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73/// A `LogicalPlan` is a node in a tree of relational operators (such as
74/// Projection or Filter).
75///
76/// Represents transforming an input relation (table) to an output relation
77/// (table) with a potentially different schema. Plans form a dataflow tree
78/// where data flows from leaves up to the root to produce the query result.
79///
80/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
81/// or programmatically (for example custom query languages).
82///
83/// # See also:
84/// * [`Expr`]: For the expressions that are evaluated by the plan
85/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
86/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
87///
88/// [`tree_node`]: crate::logical_plan::tree_node
89///
90/// # Examples
91///
92/// ## Creating a LogicalPlan from SQL:
93///
94/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
95///
96/// ## Creating a LogicalPlan from the DataFrame API:
97///
98/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
99///
100/// ## Creating a LogicalPlan programmatically:
101///
102/// See [`LogicalPlanBuilder`]
103///
104/// # Visiting and Rewriting `LogicalPlan`s
105///
106/// Using the [`tree_node`] API, you can recursively walk all nodes in a
107/// `LogicalPlan`. For example, to find all column references in a plan:
108///
109/// ```
110/// # use std::collections::HashSet;
111/// # use arrow::datatypes::{DataType, Field, Schema};
112/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
113/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
114/// # use datafusion_common::{Column, Result};
115/// # fn employee_schema() -> Schema {
116/// #    Schema::new(vec![
117/// #           Field::new("name", DataType::Utf8, false),
118/// #           Field::new("salary", DataType::Int32, false),
119/// #       ])
120/// #   }
121/// // Projection(name, salary)
122/// //   Filter(salary > 1000)
123/// //     TableScan(employee)
124/// # fn main() -> Result<()> {
125/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
126///  .filter(col("salary").gt(lit(1000)))?
127///  .project(vec![col("name")])?
128///  .build()?;
129///
130/// // use apply to walk the plan and collect all expressions
131/// let mut expressions = HashSet::new();
132/// plan.apply(|node| {
133///   // collect all expressions in the plan
134///   node.apply_expressions(|expr| {
135///    expressions.insert(expr.clone());
136///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
137///   })?;
138///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
139/// }).unwrap();
140///
141/// // we found the expression in projection and filter
142/// assert_eq!(expressions.len(), 2);
143/// println!("Found expressions: {:?}", expressions);
144/// // found predicate in the Filter: employee.salary > 1000
145/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
146/// assert!(expressions.contains(&salary.gt(lit(1000))));
147/// // found projection in the Projection: employee.name
148/// let name = Expr::Column(Column::new(Some("employee"), "name"));
149/// assert!(expressions.contains(&name));
150/// # Ok(())
151/// # }
152/// ```
153///
154/// You can also rewrite plans using the [`tree_node`] API. For example, to
155/// replace the filter predicate in a plan:
156///
157/// ```
158/// # use std::collections::HashSet;
159/// # use arrow::datatypes::{DataType, Field, Schema};
160/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
161/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
162/// # use datafusion_common::{Column, Result};
163/// # fn employee_schema() -> Schema {
164/// #    Schema::new(vec![
165/// #           Field::new("name", DataType::Utf8, false),
166/// #           Field::new("salary", DataType::Int32, false),
167/// #       ])
168/// #   }
169/// // Projection(name, salary)
170/// //   Filter(salary > 1000)
171/// //     TableScan(employee)
172/// # fn main() -> Result<()> {
173/// use datafusion_common::tree_node::Transformed;
174/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
175///  .filter(col("salary").gt(lit(1000)))?
176///  .project(vec![col("name")])?
177///  .build()?;
178///
179/// // use transform to rewrite the plan
180/// let transformed_result = plan.transform(|node| {
181///   // when we see the filter node
182///   if let LogicalPlan::Filter(mut filter) = node {
183///     // replace predicate with salary < 2000
184///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
185///     let new_plan = LogicalPlan::Filter(filter);
186///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
187///   }
188///   // return the node unchanged
189///   Ok(Transformed::no(node))
190/// }).unwrap();
191///
192/// // Transformed result contains rewritten plan and information about
193/// // whether the plan was changed
194/// assert!(transformed_result.transformed);
195/// let rewritten_plan = transformed_result.data;
196///
197/// // we found the filter
198/// assert_eq!(rewritten_plan.display_indent().to_string(),
199/// "Projection: employee.name\
200/// \n  Filter: employee.salary < Int32(2000)\
201/// \n    TableScan: employee");
202/// # Ok(())
203/// # }
204/// ```
205///
206#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
207pub enum LogicalPlan {
208    /// Evaluates an arbitrary list of expressions (essentially a
209    /// SELECT with an expression list) on its input.
210    Projection(Projection),
211    /// Filters rows from its input that do not match an
212    /// expression (essentially a WHERE clause with a predicate
213    /// expression).
214    ///
215    /// Semantically, `<predicate>` is evaluated for each row of the
216    /// input; If the value of `<predicate>` is true, the input row is
217    /// passed to the output. If the value of `<predicate>` is false
218    /// (or null), the row is discarded.
219    Filter(Filter),
220    /// Windows input based on a set of window spec and window
221    /// function (e.g. SUM or RANK).  This is used to implement SQL
222    /// window functions, and the `OVER` clause.
223    ///
224    /// See [`Window`] for more details
225    Window(Window),
226    /// Aggregates its input based on a set of grouping and aggregate
227    /// expressions (e.g. SUM). This is used to implement SQL aggregates
228    /// and `GROUP BY`.
229    ///
230    /// See [`Aggregate`] for more details
231    Aggregate(Aggregate),
232    /// Sorts its input according to a list of sort expressions. This
233    /// is used to implement SQL `ORDER BY`
234    Sort(Sort),
235    /// Join two logical plans on one or more join columns.
236    /// This is used to implement SQL `JOIN`
237    Join(Join),
238    /// Repartitions the input based on a partitioning scheme. This is
239    /// used to add parallelism and is sometimes referred to as an
240    /// "exchange" operator in other systems
241    Repartition(Repartition),
242    /// Union multiple inputs with the same schema into a single
243    /// output stream. This is used to implement SQL `UNION [ALL]` and
244    /// `INTERSECT [ALL]`.
245    Union(Union),
246    /// Produces rows from a [`TableSource`], used to implement SQL
247    /// `FROM` tables or views.
248    TableScan(TableScan),
249    /// Produces no rows: An empty relation with an empty schema that
250    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
251    /// that has no values in the `FROM` clause.
252    EmptyRelation(EmptyRelation),
253    /// Produces the output of running another query.  This is used to
254    /// implement SQL subqueries
255    Subquery(Subquery),
256    /// Aliased relation provides, or changes, the name of a relation.
257    SubqueryAlias(SubqueryAlias),
258    /// Skip some number of rows, and then fetch some number of rows.
259    Limit(Limit),
260    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
261    Statement(Statement),
262    /// Values expression. See
263    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
264    /// documentation for more details. This is used to implement SQL such as
265    /// `VALUES (1, 2), (3, 4)`
266    Values(Values),
267    /// Produces a relation with string representations of
268    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
269    Explain(Explain),
270    /// Runs the input, and prints annotated physical plan as a string
271    /// with execution metric. This is used to implement SQL
272    /// `EXPLAIN ANALYZE`.
273    Analyze(Analyze),
274    /// Extension operator defined outside of DataFusion. This is used
275    /// to extend DataFusion with custom relational operations that
276    Extension(Extension),
277    /// Remove duplicate rows from the input. This is used to
278    /// implement SQL `SELECT DISTINCT ...`.
279    Distinct(Distinct),
280    /// Data Manipulation Language (DML): Insert / Update / Delete
281    Dml(DmlStatement),
282    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
283    Ddl(DdlStatement),
284    /// `COPY TO` for writing plan results to files
285    Copy(CopyTo),
286    /// Describe the schema of the table. This is used to implement the
287    /// SQL `DESCRIBE` command from MySQL.
288    DescribeTable(DescribeTable),
289    /// Unnest a column that contains a nested list type such as an
290    /// ARRAY. This is used to implement SQL `UNNEST`
291    Unnest(Unnest),
292    /// A variadic query (e.g. "Recursive CTEs")
293    RecursiveQuery(RecursiveQuery),
294}
295
296impl Default for LogicalPlan {
297    fn default() -> Self {
298        LogicalPlan::EmptyRelation(EmptyRelation {
299            produce_one_row: false,
300            schema: Arc::new(DFSchema::empty()),
301        })
302    }
303}
304
305impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
306    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
307        &'a self,
308        mut f: F,
309    ) -> Result<TreeNodeRecursion> {
310        f(self)
311    }
312
313    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
314        self,
315        mut f: F,
316    ) -> Result<Transformed<Self>> {
317        f(self)
318    }
319}
320
321impl LogicalPlan {
322    /// Get a reference to the logical plan's schema
323    pub fn schema(&self) -> &DFSchemaRef {
324        match self {
325            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
326            LogicalPlan::Values(Values { schema, .. }) => schema,
327            LogicalPlan::TableScan(TableScan {
328                projected_schema, ..
329            }) => projected_schema,
330            LogicalPlan::Projection(Projection { schema, .. }) => schema,
331            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
332            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
333            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
334            LogicalPlan::Window(Window { schema, .. }) => schema,
335            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
336            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
337            LogicalPlan::Join(Join { schema, .. }) => schema,
338            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
339            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
340            LogicalPlan::Statement(statement) => statement.schema(),
341            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
342            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
343            LogicalPlan::Explain(explain) => &explain.schema,
344            LogicalPlan::Analyze(analyze) => &analyze.schema,
345            LogicalPlan::Extension(extension) => extension.node.schema(),
346            LogicalPlan::Union(Union { schema, .. }) => schema,
347            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
348                output_schema
349            }
350            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
351            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
352            LogicalPlan::Ddl(ddl) => ddl.schema(),
353            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
354            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
355                // we take the schema of the static term as the schema of the entire recursive query
356                static_term.schema()
357            }
358        }
359    }
360
361    /// Used for normalizing columns, as the fallback schemas to the main schema
362    /// of the plan.
363    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
364        match self {
365            LogicalPlan::Window(_)
366            | LogicalPlan::Projection(_)
367            | LogicalPlan::Aggregate(_)
368            | LogicalPlan::Unnest(_)
369            | LogicalPlan::Join(_) => self
370                .inputs()
371                .iter()
372                .map(|input| input.schema().as_ref())
373                .collect(),
374            _ => vec![],
375        }
376    }
377
378    /// Returns the (fixed) output schema for explain plans
379    pub fn explain_schema() -> SchemaRef {
380        SchemaRef::new(Schema::new(vec![
381            Field::new("plan_type", DataType::Utf8, false),
382            Field::new("plan", DataType::Utf8, false),
383        ]))
384    }
385
386    /// Returns the (fixed) output schema for `DESCRIBE` plans
387    pub fn describe_schema() -> Schema {
388        Schema::new(vec![
389            Field::new("column_name", DataType::Utf8, false),
390            Field::new("data_type", DataType::Utf8, false),
391            Field::new("is_nullable", DataType::Utf8, false),
392        ])
393    }
394
395    /// Returns all expressions (non-recursively) evaluated by the current
396    /// logical plan node. This does not include expressions in any children.
397    ///
398    /// Note this method `clone`s all the expressions. When possible, the
399    /// [`tree_node`] API should be used instead of this API.
400    ///
401    /// The returned expressions do not necessarily represent or even
402    /// contributed to the output schema of this node. For example,
403    /// `LogicalPlan::Filter` returns the filter expression even though the
404    /// output of a Filter has the same columns as the input.
405    ///
406    /// The expressions do contain all the columns that are used by this plan,
407    /// so if there are columns not referenced by these expressions then
408    /// DataFusion's optimizer attempts to optimize them away.
409    ///
410    /// [`tree_node`]: crate::logical_plan::tree_node
411    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
412        let mut exprs = vec![];
413        self.apply_expressions(|e| {
414            exprs.push(e.clone());
415            Ok(TreeNodeRecursion::Continue)
416        })
417        // closure always returns OK
418        .unwrap();
419        exprs
420    }
421
422    /// Returns all the out reference(correlated) expressions (recursively) in the current
423    /// logical plan nodes and all its descendant nodes.
424    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
425        let mut exprs = vec![];
426        self.apply_expressions(|e| {
427            find_out_reference_exprs(e).into_iter().for_each(|e| {
428                if !exprs.contains(&e) {
429                    exprs.push(e)
430                }
431            });
432            Ok(TreeNodeRecursion::Continue)
433        })
434        // closure always returns OK
435        .unwrap();
436        self.inputs()
437            .into_iter()
438            .flat_map(|child| child.all_out_ref_exprs())
439            .for_each(|e| {
440                if !exprs.contains(&e) {
441                    exprs.push(e)
442                }
443            });
444        exprs
445    }
446
447    /// Returns all inputs / children of this `LogicalPlan` node.
448    ///
449    /// Note does not include inputs to inputs, or subqueries.
450    pub fn inputs(&self) -> Vec<&LogicalPlan> {
451        match self {
452            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
453            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
454            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
455            LogicalPlan::Window(Window { input, .. }) => vec![input],
456            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
457            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
458            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
459            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
460            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
461            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
462            LogicalPlan::Extension(extension) => extension.node.inputs(),
463            LogicalPlan::Union(Union { inputs, .. }) => {
464                inputs.iter().map(|arc| arc.as_ref()).collect()
465            }
466            LogicalPlan::Distinct(
467                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
468            ) => vec![input],
469            LogicalPlan::Explain(explain) => vec![&explain.plan],
470            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
471            LogicalPlan::Dml(write) => vec![&write.input],
472            LogicalPlan::Copy(copy) => vec![&copy.input],
473            LogicalPlan::Ddl(ddl) => ddl.inputs(),
474            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
475            LogicalPlan::RecursiveQuery(RecursiveQuery {
476                static_term,
477                recursive_term,
478                ..
479            }) => vec![static_term, recursive_term],
480            LogicalPlan::Statement(stmt) => stmt.inputs(),
481            // plans without inputs
482            LogicalPlan::TableScan { .. }
483            | LogicalPlan::EmptyRelation { .. }
484            | LogicalPlan::Values { .. }
485            | LogicalPlan::DescribeTable(_) => vec![],
486        }
487    }
488
489    /// returns all `Using` join columns in a logical plan
490    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
491        let mut using_columns: Vec<HashSet<Column>> = vec![];
492
493        self.apply_with_subqueries(|plan| {
494            if let LogicalPlan::Join(Join {
495                join_constraint: JoinConstraint::Using,
496                on,
497                ..
498            }) = plan
499            {
500                // The join keys in using-join must be columns.
501                let columns =
502                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
503                        let Some(l) = l.get_as_join_column() else {
504                            return internal_err!(
505                                "Invalid join key. Expected column, found {l:?}"
506                            );
507                        };
508                        let Some(r) = r.get_as_join_column() else {
509                            return internal_err!(
510                                "Invalid join key. Expected column, found {r:?}"
511                            );
512                        };
513                        accumu.insert(l.to_owned());
514                        accumu.insert(r.to_owned());
515                        Result::<_, DataFusionError>::Ok(accumu)
516                    })?;
517                using_columns.push(columns);
518            }
519            Ok(TreeNodeRecursion::Continue)
520        })?;
521
522        Ok(using_columns)
523    }
524
525    /// returns the first output expression of this `LogicalPlan` node.
526    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
527        match self {
528            LogicalPlan::Projection(projection) => {
529                Ok(Some(projection.expr.as_slice()[0].clone()))
530            }
531            LogicalPlan::Aggregate(agg) => {
532                if agg.group_expr.is_empty() {
533                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
534                } else {
535                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
536                }
537            }
538            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
539                Ok(Some(select_expr[0].clone()))
540            }
541            LogicalPlan::Filter(Filter { input, .. })
542            | LogicalPlan::Distinct(Distinct::All(input))
543            | LogicalPlan::Sort(Sort { input, .. })
544            | LogicalPlan::Limit(Limit { input, .. })
545            | LogicalPlan::Repartition(Repartition { input, .. })
546            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
547            LogicalPlan::Join(Join {
548                left,
549                right,
550                join_type,
551                ..
552            }) => match join_type {
553                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
554                    if left.schema().fields().is_empty() {
555                        right.head_output_expr()
556                    } else {
557                        left.head_output_expr()
558                    }
559                }
560                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
561                    left.head_output_expr()
562                }
563                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
564                    right.head_output_expr()
565                }
566            },
567            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
568                static_term.head_output_expr()
569            }
570            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
571                union.schema.qualified_field(0),
572            )))),
573            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
574                table.projected_schema.qualified_field(0),
575            )))),
576            LogicalPlan::SubqueryAlias(subquery_alias) => {
577                let expr_opt = subquery_alias.input.head_output_expr()?;
578                expr_opt
579                    .map(|expr| {
580                        Ok(Expr::Column(create_col_from_scalar_expr(
581                            &expr,
582                            subquery_alias.alias.to_string(),
583                        )?))
584                    })
585                    .map_or(Ok(None), |v| v.map(Some))
586            }
587            LogicalPlan::Subquery(_) => Ok(None),
588            LogicalPlan::EmptyRelation(_)
589            | LogicalPlan::Statement(_)
590            | LogicalPlan::Values(_)
591            | LogicalPlan::Explain(_)
592            | LogicalPlan::Analyze(_)
593            | LogicalPlan::Extension(_)
594            | LogicalPlan::Dml(_)
595            | LogicalPlan::Copy(_)
596            | LogicalPlan::Ddl(_)
597            | LogicalPlan::DescribeTable(_)
598            | LogicalPlan::Unnest(_) => Ok(None),
599        }
600    }
601
602    /// Recomputes schema and type information for this LogicalPlan if needed.
603    ///
604    /// Some `LogicalPlan`s may need to recompute their schema if the number or
605    /// type of expressions have been changed (for example due to type
606    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
607    /// its expressions.
608    ///
609    /// Some `LogicalPlan`s schema is unaffected by any changes to their
610    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
611    /// same as its input schema.
612    ///
613    /// This is useful after modifying a plans `Expr`s (or input plans) via
614    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
615    /// [Self::with_new_exprs], this method does not require a new set of
616    /// expressions or inputs plans.
617    ///
618    /// # Return value
619    /// Returns an error if there is some issue recomputing the schema.
620    ///
621    /// # Notes
622    ///
623    /// * Does not recursively recompute schema for input (child) plans.
624    pub fn recompute_schema(self) -> Result<Self> {
625        match self {
626            // Since expr may be different than the previous expr, schema of the projection
627            // may change. We need to use try_new method instead of try_new_with_schema method.
628            LogicalPlan::Projection(Projection {
629                expr,
630                input,
631                schema: _,
632            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633            LogicalPlan::Dml(_) => Ok(self),
634            LogicalPlan::Copy(_) => Ok(self),
635            LogicalPlan::Values(Values { schema, values }) => {
636                // todo it isn't clear why the schema is not recomputed here
637                Ok(LogicalPlan::Values(Values { schema, values }))
638            }
639            LogicalPlan::Filter(Filter { predicate, input }) => {
640                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
641            }
642            LogicalPlan::Repartition(_) => Ok(self),
643            LogicalPlan::Window(Window {
644                input,
645                window_expr,
646                schema: _,
647            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
648            LogicalPlan::Aggregate(Aggregate {
649                input,
650                group_expr,
651                aggr_expr,
652                schema: _,
653            }) => Aggregate::try_new(input, group_expr, aggr_expr)
654                .map(LogicalPlan::Aggregate),
655            LogicalPlan::Sort(_) => Ok(self),
656            LogicalPlan::Join(Join {
657                left,
658                right,
659                filter,
660                join_type,
661                join_constraint,
662                on,
663                schema: _,
664                null_equality,
665            }) => {
666                let schema =
667                    build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669                let new_on: Vec<_> = on
670                    .into_iter()
671                    .map(|equi_expr| {
672                        // SimplifyExpression rule may add alias to the equi_expr.
673                        (equi_expr.0.unalias(), equi_expr.1.unalias())
674                    })
675                    .collect();
676
677                Ok(LogicalPlan::Join(Join {
678                    left,
679                    right,
680                    join_type,
681                    join_constraint,
682                    on: new_on,
683                    filter,
684                    schema: DFSchemaRef::new(schema),
685                    null_equality,
686                }))
687            }
688            LogicalPlan::Subquery(_) => Ok(self),
689            LogicalPlan::SubqueryAlias(SubqueryAlias {
690                input,
691                alias,
692                schema: _,
693            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
694            LogicalPlan::Limit(_) => Ok(self),
695            LogicalPlan::Ddl(_) => Ok(self),
696            LogicalPlan::Extension(Extension { node }) => {
697                // todo make an API that does not require cloning
698                // This requires a copy of the extension nodes expressions and inputs
699                let expr = node.expressions();
700                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
701                Ok(LogicalPlan::Extension(Extension {
702                    node: node.with_exprs_and_inputs(expr, inputs)?,
703                }))
704            }
705            LogicalPlan::Union(Union { inputs, schema }) => {
706                let first_input_schema = inputs[0].schema();
707                if schema.fields().len() == first_input_schema.fields().len() {
708                    // If inputs are not pruned do not change schema
709                    Ok(LogicalPlan::Union(Union { inputs, schema }))
710                } else {
711                    // A note on `Union`s constructed via `try_new_by_name`:
712                    //
713                    // At this point, the schema for each input should have
714                    // the same width. Thus, we do not need to save whether a
715                    // `Union` was created `BY NAME`, and can safely rely on the
716                    // `try_new` initializer to derive the new schema based on
717                    // column positions.
718                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
719                }
720            }
721            LogicalPlan::Distinct(distinct) => {
722                let distinct = match distinct {
723                    Distinct::All(input) => Distinct::All(input),
724                    Distinct::On(DistinctOn {
725                        on_expr,
726                        select_expr,
727                        sort_expr,
728                        input,
729                        schema: _,
730                    }) => Distinct::On(DistinctOn::try_new(
731                        on_expr,
732                        select_expr,
733                        sort_expr,
734                        input,
735                    )?),
736                };
737                Ok(LogicalPlan::Distinct(distinct))
738            }
739            LogicalPlan::RecursiveQuery(_) => Ok(self),
740            LogicalPlan::Analyze(_) => Ok(self),
741            LogicalPlan::Explain(_) => Ok(self),
742            LogicalPlan::TableScan(_) => Ok(self),
743            LogicalPlan::EmptyRelation(_) => Ok(self),
744            LogicalPlan::Statement(_) => Ok(self),
745            LogicalPlan::DescribeTable(_) => Ok(self),
746            LogicalPlan::Unnest(Unnest {
747                input,
748                exec_columns,
749                options,
750                ..
751            }) => {
752                // Update schema with unnested column type.
753                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
754            }
755        }
756    }
757
758    /// Returns a new `LogicalPlan` based on `self` with inputs and
759    /// expressions replaced.
760    ///
761    /// Note this method creates an entirely new node, which requires a large
762    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
763    /// instead of this API.
764    ///
765    /// The exprs correspond to the same order of expressions returned
766    /// by [`Self::expressions`]. This function is used by optimizers
767    /// to rewrite plans using the following pattern:
768    ///
769    /// [`tree_node`]: crate::logical_plan::tree_node
770    ///
771    /// ```text
772    /// let new_inputs = optimize_children(..., plan, props);
773    ///
774    /// // get the plans expressions to optimize
775    /// let exprs = plan.expressions();
776    ///
777    /// // potentially rewrite plan expressions
778    /// let rewritten_exprs = rewrite_exprs(exprs);
779    ///
780    /// // create new plan using rewritten_exprs in same position
781    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
782    /// ```
783    pub fn with_new_exprs(
784        &self,
785        mut expr: Vec<Expr>,
786        inputs: Vec<LogicalPlan>,
787    ) -> Result<LogicalPlan> {
788        match self {
789            // Since expr may be different than the previous expr, schema of the projection
790            // may change. We need to use try_new method instead of try_new_with_schema method.
791            LogicalPlan::Projection(Projection { .. }) => {
792                let input = self.only_input(inputs)?;
793                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
794            }
795            LogicalPlan::Dml(DmlStatement {
796                table_name,
797                target,
798                op,
799                ..
800            }) => {
801                self.assert_no_expressions(expr)?;
802                let input = self.only_input(inputs)?;
803                Ok(LogicalPlan::Dml(DmlStatement::new(
804                    table_name.clone(),
805                    Arc::clone(target),
806                    op.clone(),
807                    Arc::new(input),
808                )))
809            }
810            LogicalPlan::Copy(CopyTo {
811                input: _,
812                output_url,
813                file_type,
814                options,
815                partition_by,
816                output_schema: _,
817            }) => {
818                self.assert_no_expressions(expr)?;
819                let input = self.only_input(inputs)?;
820                Ok(LogicalPlan::Copy(CopyTo::new(
821                    Arc::new(input),
822                    output_url.clone(),
823                    partition_by.clone(),
824                    Arc::clone(file_type),
825                    options.clone(),
826                )))
827            }
828            LogicalPlan::Values(Values { schema, .. }) => {
829                self.assert_no_inputs(inputs)?;
830                Ok(LogicalPlan::Values(Values {
831                    schema: Arc::clone(schema),
832                    values: expr
833                        .chunks_exact(schema.fields().len())
834                        .map(|s| s.to_vec())
835                        .collect(),
836                }))
837            }
838            LogicalPlan::Filter { .. } => {
839                let predicate = self.only_expr(expr)?;
840                let input = self.only_input(inputs)?;
841
842                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
843            }
844            LogicalPlan::Repartition(Repartition {
845                partitioning_scheme,
846                ..
847            }) => match partitioning_scheme {
848                Partitioning::RoundRobinBatch(n) => {
849                    self.assert_no_expressions(expr)?;
850                    let input = self.only_input(inputs)?;
851                    Ok(LogicalPlan::Repartition(Repartition {
852                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
853                        input: Arc::new(input),
854                    }))
855                }
856                Partitioning::Hash(_, n) => {
857                    let input = self.only_input(inputs)?;
858                    Ok(LogicalPlan::Repartition(Repartition {
859                        partitioning_scheme: Partitioning::Hash(expr, *n),
860                        input: Arc::new(input),
861                    }))
862                }
863                Partitioning::DistributeBy(_) => {
864                    let input = self.only_input(inputs)?;
865                    Ok(LogicalPlan::Repartition(Repartition {
866                        partitioning_scheme: Partitioning::DistributeBy(expr),
867                        input: Arc::new(input),
868                    }))
869                }
870            },
871            LogicalPlan::Window(Window { window_expr, .. }) => {
872                assert_eq!(window_expr.len(), expr.len());
873                let input = self.only_input(inputs)?;
874                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
875            }
876            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
877                let input = self.only_input(inputs)?;
878                // group exprs are the first expressions
879                let agg_expr = expr.split_off(group_expr.len());
880
881                Aggregate::try_new(Arc::new(input), expr, agg_expr)
882                    .map(LogicalPlan::Aggregate)
883            }
884            LogicalPlan::Sort(Sort {
885                expr: sort_expr,
886                fetch,
887                ..
888            }) => {
889                let input = self.only_input(inputs)?;
890                Ok(LogicalPlan::Sort(Sort {
891                    expr: expr
892                        .into_iter()
893                        .zip(sort_expr.iter())
894                        .map(|(expr, sort)| sort.with_expr(expr))
895                        .collect(),
896                    input: Arc::new(input),
897                    fetch: *fetch,
898                }))
899            }
900            LogicalPlan::Join(Join {
901                join_type,
902                join_constraint,
903                on,
904                null_equality,
905                ..
906            }) => {
907                let (left, right) = self.only_two_inputs(inputs)?;
908                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
909
910                let equi_expr_count = on.len() * 2;
911                assert!(expr.len() >= equi_expr_count);
912
913                // Assume that the last expr, if any,
914                // is the filter_expr (non equality predicate from ON clause)
915                let filter_expr = if expr.len() > equi_expr_count {
916                    expr.pop()
917                } else {
918                    None
919                };
920
921                // The first part of expr is equi-exprs,
922                // and the struct of each equi-expr is like `left-expr = right-expr`.
923                assert_eq!(expr.len(), equi_expr_count);
924                let mut new_on = Vec::with_capacity(on.len());
925                let mut iter = expr.into_iter();
926                while let Some(left) = iter.next() {
927                    let Some(right) = iter.next() else {
928                        internal_err!("Expected a pair of expressions to construct the join on expression")?
929                    };
930
931                    // SimplifyExpression rule may add alias to the equi_expr.
932                    new_on.push((left.unalias(), right.unalias()));
933                }
934
935                Ok(LogicalPlan::Join(Join {
936                    left: Arc::new(left),
937                    right: Arc::new(right),
938                    join_type: *join_type,
939                    join_constraint: *join_constraint,
940                    on: new_on,
941                    filter: filter_expr,
942                    schema: DFSchemaRef::new(schema),
943                    null_equality: *null_equality,
944                }))
945            }
946            LogicalPlan::Subquery(Subquery {
947                outer_ref_columns,
948                spans,
949                ..
950            }) => {
951                self.assert_no_expressions(expr)?;
952                let input = self.only_input(inputs)?;
953                let subquery = LogicalPlanBuilder::from(input).build()?;
954                Ok(LogicalPlan::Subquery(Subquery {
955                    subquery: Arc::new(subquery),
956                    outer_ref_columns: outer_ref_columns.clone(),
957                    spans: spans.clone(),
958                }))
959            }
960            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
961                self.assert_no_expressions(expr)?;
962                let input = self.only_input(inputs)?;
963                SubqueryAlias::try_new(Arc::new(input), alias.clone())
964                    .map(LogicalPlan::SubqueryAlias)
965            }
966            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
967                let old_expr_len = skip.iter().chain(fetch.iter()).count();
968                if old_expr_len != expr.len() {
969                    return internal_err!(
970                        "Invalid number of new Limit expressions: expected {}, got {}",
971                        old_expr_len,
972                        expr.len()
973                    );
974                }
975                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
976                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
977                let new_skip = skip.as_ref().and_then(|_| expr.pop());
978                let input = self.only_input(inputs)?;
979                Ok(LogicalPlan::Limit(Limit {
980                    skip: new_skip.map(Box::new),
981                    fetch: new_fetch.map(Box::new),
982                    input: Arc::new(input),
983                }))
984            }
985            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
986                name,
987                if_not_exists,
988                or_replace,
989                column_defaults,
990                temporary,
991                ..
992            })) => {
993                self.assert_no_expressions(expr)?;
994                let input = self.only_input(inputs)?;
995                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
996                    CreateMemoryTable {
997                        input: Arc::new(input),
998                        constraints: Constraints::default(),
999                        name: name.clone(),
1000                        if_not_exists: *if_not_exists,
1001                        or_replace: *or_replace,
1002                        column_defaults: column_defaults.clone(),
1003                        temporary: *temporary,
1004                    },
1005                )))
1006            }
1007            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1008                name,
1009                or_replace,
1010                definition,
1011                temporary,
1012                ..
1013            })) => {
1014                self.assert_no_expressions(expr)?;
1015                let input = self.only_input(inputs)?;
1016                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1017                    input: Arc::new(input),
1018                    name: name.clone(),
1019                    or_replace: *or_replace,
1020                    temporary: *temporary,
1021                    definition: definition.clone(),
1022                })))
1023            }
1024            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1025                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1026            })),
1027            LogicalPlan::Union(Union { schema, .. }) => {
1028                self.assert_no_expressions(expr)?;
1029                let input_schema = inputs[0].schema();
1030                // If inputs are not pruned do not change schema.
1031                let schema = if schema.fields().len() == input_schema.fields().len() {
1032                    Arc::clone(schema)
1033                } else {
1034                    Arc::clone(input_schema)
1035                };
1036                Ok(LogicalPlan::Union(Union {
1037                    inputs: inputs.into_iter().map(Arc::new).collect(),
1038                    schema,
1039                }))
1040            }
1041            LogicalPlan::Distinct(distinct) => {
1042                let distinct = match distinct {
1043                    Distinct::All(_) => {
1044                        self.assert_no_expressions(expr)?;
1045                        let input = self.only_input(inputs)?;
1046                        Distinct::All(Arc::new(input))
1047                    }
1048                    Distinct::On(DistinctOn {
1049                        on_expr,
1050                        select_expr,
1051                        ..
1052                    }) => {
1053                        let input = self.only_input(inputs)?;
1054                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1055                        let select_expr = expr.split_off(on_expr.len());
1056                        assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1057                        Distinct::On(DistinctOn::try_new(
1058                            expr,
1059                            select_expr,
1060                            None, // no sort expressions accepted
1061                            Arc::new(input),
1062                        )?)
1063                    }
1064                };
1065                Ok(LogicalPlan::Distinct(distinct))
1066            }
1067            LogicalPlan::RecursiveQuery(RecursiveQuery {
1068                name, is_distinct, ..
1069            }) => {
1070                self.assert_no_expressions(expr)?;
1071                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1072                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1073                    name: name.clone(),
1074                    static_term: Arc::new(static_term),
1075                    recursive_term: Arc::new(recursive_term),
1076                    is_distinct: *is_distinct,
1077                }))
1078            }
1079            LogicalPlan::Analyze(a) => {
1080                self.assert_no_expressions(expr)?;
1081                let input = self.only_input(inputs)?;
1082                Ok(LogicalPlan::Analyze(Analyze {
1083                    verbose: a.verbose,
1084                    schema: Arc::clone(&a.schema),
1085                    input: Arc::new(input),
1086                }))
1087            }
1088            LogicalPlan::Explain(e) => {
1089                self.assert_no_expressions(expr)?;
1090                let input = self.only_input(inputs)?;
1091                Ok(LogicalPlan::Explain(Explain {
1092                    verbose: e.verbose,
1093                    plan: Arc::new(input),
1094                    explain_format: e.explain_format.clone(),
1095                    stringified_plans: e.stringified_plans.clone(),
1096                    schema: Arc::clone(&e.schema),
1097                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1098                }))
1099            }
1100            LogicalPlan::Statement(Statement::Prepare(Prepare {
1101                name,
1102                data_types,
1103                ..
1104            })) => {
1105                self.assert_no_expressions(expr)?;
1106                let input = self.only_input(inputs)?;
1107                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1108                    name: name.clone(),
1109                    data_types: data_types.clone(),
1110                    input: Arc::new(input),
1111                })))
1112            }
1113            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1114                self.assert_no_inputs(inputs)?;
1115                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1116                    name: name.clone(),
1117                    parameters: expr,
1118                })))
1119            }
1120            LogicalPlan::TableScan(ts) => {
1121                self.assert_no_inputs(inputs)?;
1122                Ok(LogicalPlan::TableScan(TableScan {
1123                    filters: expr,
1124                    ..ts.clone()
1125                }))
1126            }
1127            LogicalPlan::EmptyRelation(_)
1128            | LogicalPlan::Ddl(_)
1129            | LogicalPlan::Statement(_)
1130            | LogicalPlan::DescribeTable(_) => {
1131                // All of these plan types have no inputs / exprs so should not be called
1132                self.assert_no_expressions(expr)?;
1133                self.assert_no_inputs(inputs)?;
1134                Ok(self.clone())
1135            }
1136            LogicalPlan::Unnest(Unnest {
1137                exec_columns: columns,
1138                options,
1139                ..
1140            }) => {
1141                self.assert_no_expressions(expr)?;
1142                let input = self.only_input(inputs)?;
1143                // Update schema with unnested column type.
1144                let new_plan =
1145                    unnest_with_options(input, columns.clone(), options.clone())?;
1146                Ok(new_plan)
1147            }
1148        }
1149    }
1150
1151    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1152    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1153        match check {
1154            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1155            InvariantLevel::Executable => assert_executable_invariants(self),
1156        }
1157    }
1158
1159    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1160    #[inline]
1161    #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1162    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1163        if !expr.is_empty() {
1164            return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1165        }
1166        Ok(())
1167    }
1168
1169    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1170    #[inline]
1171    #[allow(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1172    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1173        if !inputs.is_empty() {
1174            return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1175        }
1176        Ok(())
1177    }
1178
1179    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1180    #[inline]
1181    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1182        if expr.len() != 1 {
1183            return internal_err!(
1184                "{self:?} should have exactly one expr, got {:?}",
1185                expr
1186            );
1187        }
1188        Ok(expr.remove(0))
1189    }
1190
1191    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1192    #[inline]
1193    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1194        if inputs.len() != 1 {
1195            return internal_err!(
1196                "{self:?} should have exactly one input, got {:?}",
1197                inputs
1198            );
1199        }
1200        Ok(inputs.remove(0))
1201    }
1202
1203    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1204    #[inline]
1205    fn only_two_inputs(
1206        &self,
1207        mut inputs: Vec<LogicalPlan>,
1208    ) -> Result<(LogicalPlan, LogicalPlan)> {
1209        if inputs.len() != 2 {
1210            return internal_err!(
1211                "{self:?} should have exactly two inputs, got {:?}",
1212                inputs
1213            );
1214        }
1215        let right = inputs.remove(1);
1216        let left = inputs.remove(0);
1217        Ok((left, right))
1218    }
1219
1220    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1221    /// with the specified `param_values`.
1222    ///
1223    /// [`Prepare`] statements are converted to
1224    /// their inner logical plan for execution.
1225    ///
1226    /// # Example
1227    /// ```
1228    /// # use arrow::datatypes::{Field, Schema, DataType};
1229    /// use datafusion_common::ScalarValue;
1230    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1231    /// # let schema = Schema::new(vec![
1232    /// #     Field::new("id", DataType::Int32, false),
1233    /// # ]);
1234    /// // Build SELECT * FROM t1 WHERE id = $1
1235    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1236    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1237    ///     .build().unwrap();
1238    ///
1239    /// assert_eq!(
1240    ///   "Filter: t1.id = $1\
1241    ///   \n  TableScan: t1",
1242    ///   plan.display_indent().to_string()
1243    /// );
1244    ///
1245    /// // Fill in the parameter $1 with a literal 3
1246    /// let plan = plan.with_param_values(vec![
1247    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1248    /// ]).unwrap();
1249    ///
1250    /// assert_eq!(
1251    ///    "Filter: t1.id = Int32(3)\
1252    ///    \n  TableScan: t1",
1253    ///    plan.display_indent().to_string()
1254    ///  );
1255    ///
1256    /// // Note you can also used named parameters
1257    /// // Build SELECT * FROM t1 WHERE id = $my_param
1258    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1259    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1260    ///     .build().unwrap()
1261    ///     // Fill in the parameter $my_param with a literal 3
1262    ///     .with_param_values(vec![
1263    ///       ("my_param", ScalarValue::from(3i32)),
1264    ///     ]).unwrap();
1265    ///
1266    /// assert_eq!(
1267    ///    "Filter: t1.id = Int32(3)\
1268    ///    \n  TableScan: t1",
1269    ///    plan.display_indent().to_string()
1270    ///  );
1271    ///
1272    /// ```
1273    pub fn with_param_values(
1274        self,
1275        param_values: impl Into<ParamValues>,
1276    ) -> Result<LogicalPlan> {
1277        let param_values = param_values.into();
1278        let plan_with_values = self.replace_params_with_values(&param_values)?;
1279
1280        // unwrap Prepare
1281        Ok(
1282            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1283                plan_with_values
1284            {
1285                param_values.verify(&prepare_lp.data_types)?;
1286                // try and take ownership of the input if is not shared, clone otherwise
1287                Arc::unwrap_or_clone(prepare_lp.input)
1288            } else {
1289                plan_with_values
1290            },
1291        )
1292    }
1293
1294    /// Returns the maximum number of rows that this plan can output, if known.
1295    ///
1296    /// If `None`, the plan can return any number of rows.
1297    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1298    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1299        match self {
1300            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1301            LogicalPlan::Filter(filter) => {
1302                if filter.is_scalar() {
1303                    Some(1)
1304                } else {
1305                    filter.input.max_rows()
1306                }
1307            }
1308            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1309            LogicalPlan::Aggregate(Aggregate {
1310                input, group_expr, ..
1311            }) => {
1312                // Empty group_expr will return Some(1)
1313                if group_expr
1314                    .iter()
1315                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1316                {
1317                    Some(1)
1318                } else {
1319                    input.max_rows()
1320                }
1321            }
1322            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1323                match (fetch, input.max_rows()) {
1324                    (Some(fetch_limit), Some(input_max)) => {
1325                        Some(input_max.min(*fetch_limit))
1326                    }
1327                    (Some(fetch_limit), None) => Some(*fetch_limit),
1328                    (None, Some(input_max)) => Some(input_max),
1329                    (None, None) => None,
1330                }
1331            }
1332            LogicalPlan::Join(Join {
1333                left,
1334                right,
1335                join_type,
1336                ..
1337            }) => match join_type {
1338                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1339                JoinType::Left | JoinType::Right | JoinType::Full => {
1340                    match (left.max_rows()?, right.max_rows()?, join_type) {
1341                        (0, 0, _) => Some(0),
1342                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1343                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1344                        (left_max, right_max, _) => Some(left_max * right_max),
1345                    }
1346                }
1347                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1348                    left.max_rows()
1349                }
1350                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1351                    right.max_rows()
1352                }
1353            },
1354            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1355            LogicalPlan::Union(Union { inputs, .. }) => {
1356                inputs.iter().try_fold(0usize, |mut acc, plan| {
1357                    acc += plan.max_rows()?;
1358                    Some(acc)
1359                })
1360            }
1361            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1362            LogicalPlan::EmptyRelation(_) => Some(0),
1363            LogicalPlan::RecursiveQuery(_) => None,
1364            LogicalPlan::Subquery(_) => None,
1365            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1366            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1367                Ok(FetchType::Literal(s)) => s,
1368                _ => None,
1369            },
1370            LogicalPlan::Distinct(
1371                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1372            ) => input.max_rows(),
1373            LogicalPlan::Values(v) => Some(v.values.len()),
1374            LogicalPlan::Unnest(_) => None,
1375            LogicalPlan::Ddl(_)
1376            | LogicalPlan::Explain(_)
1377            | LogicalPlan::Analyze(_)
1378            | LogicalPlan::Dml(_)
1379            | LogicalPlan::Copy(_)
1380            | LogicalPlan::DescribeTable(_)
1381            | LogicalPlan::Statement(_)
1382            | LogicalPlan::Extension(_) => None,
1383        }
1384    }
1385
1386    /// If this node's expressions contains any references to an outer subquery
1387    pub fn contains_outer_reference(&self) -> bool {
1388        let mut contains = false;
1389        self.apply_expressions(|expr| {
1390            Ok(if expr.contains_outer() {
1391                contains = true;
1392                TreeNodeRecursion::Stop
1393            } else {
1394                TreeNodeRecursion::Continue
1395            })
1396        })
1397        .unwrap();
1398        contains
1399    }
1400
1401    /// Get the output expressions and their corresponding columns.
1402    ///
1403    /// The parent node may reference the output columns of the plan by expressions, such as
1404    /// projection over aggregate or window functions. This method helps to convert the
1405    /// referenced expressions into columns.
1406    ///
1407    /// See also: [`crate::utils::columnize_expr`]
1408    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1409        match self {
1410            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1411                .output_expressions()?
1412                .into_iter()
1413                .zip(self.schema().columns())
1414                .collect()),
1415            LogicalPlan::Window(Window {
1416                window_expr,
1417                input,
1418                schema,
1419            }) => {
1420                // The input could be another Window, so the result should also include the input's. For Example:
1421                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1422                // Its plan is:
1423                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1424                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1425                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1426                //       TableScan: t projection=[a, b]
1427                let mut output_exprs = input.columnized_output_exprs()?;
1428                let input_len = input.schema().fields().len();
1429                output_exprs.extend(
1430                    window_expr
1431                        .iter()
1432                        .zip(schema.columns().into_iter().skip(input_len)),
1433                );
1434                Ok(output_exprs)
1435            }
1436            _ => Ok(vec![]),
1437        }
1438    }
1439}
1440
1441impl LogicalPlan {
1442    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1443    /// ...) replaced with corresponding values provided in
1444    /// `params_values`
1445    ///
1446    /// See [`Self::with_param_values`] for examples and usage with an owned
1447    /// `ParamValues`
1448    pub fn replace_params_with_values(
1449        self,
1450        param_values: &ParamValues,
1451    ) -> Result<LogicalPlan> {
1452        self.transform_up_with_subqueries(|plan| {
1453            let schema = Arc::clone(plan.schema());
1454            let name_preserver = NamePreserver::new(&plan);
1455            plan.map_expressions(|e| {
1456                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1457                if !has_placeholder {
1458                    // Performance optimization:
1459                    // avoid NamePreserver copy and second pass over expression
1460                    // if no placeholders.
1461                    Ok(Transformed::no(e))
1462                } else {
1463                    let original_name = name_preserver.save(&e);
1464                    let transformed_expr = e.transform_up(|e| {
1465                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1466                            let value = param_values.get_placeholders_with_values(&id)?;
1467                            Ok(Transformed::yes(Expr::Literal(value, None)))
1468                        } else {
1469                            Ok(Transformed::no(e))
1470                        }
1471                    })?;
1472                    // Preserve name to avoid breaking column references to this expression
1473                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1474                }
1475            })
1476        })
1477        .map(|res| res.data)
1478    }
1479
1480    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1481    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1482        let mut param_names = HashSet::new();
1483        self.apply_with_subqueries(|plan| {
1484            plan.apply_expressions(|expr| {
1485                expr.apply(|expr| {
1486                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1487                        param_names.insert(id.clone());
1488                    }
1489                    Ok(TreeNodeRecursion::Continue)
1490                })
1491            })
1492        })
1493        .map(|_| param_names)
1494    }
1495
1496    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1497    pub fn get_parameter_types(
1498        &self,
1499    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1500        let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1501
1502        self.apply_with_subqueries(|plan| {
1503            plan.apply_expressions(|expr| {
1504                expr.apply(|expr| {
1505                    if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1506                        let prev = param_types.get(id);
1507                        match (prev, data_type) {
1508                            (Some(Some(prev)), Some(dt)) => {
1509                                if prev != dt {
1510                                    plan_err!("Conflicting types for {id}")?;
1511                                }
1512                            }
1513                            (_, Some(dt)) => {
1514                                param_types.insert(id.clone(), Some(dt.clone()));
1515                            }
1516                            _ => {
1517                                param_types.insert(id.clone(), None);
1518                            }
1519                        }
1520                    }
1521                    Ok(TreeNodeRecursion::Continue)
1522                })
1523            })
1524        })
1525        .map(|_| param_types)
1526    }
1527
1528    // ------------
1529    // Various implementations for printing out LogicalPlans
1530    // ------------
1531
1532    /// Return a `format`able structure that produces a single line
1533    /// per node.
1534    ///
1535    /// # Example
1536    ///
1537    /// ```text
1538    /// Projection: employee.id
1539    ///    Filter: employee.state Eq Utf8(\"CO\")\
1540    ///       CsvScan: employee projection=Some([0, 3])
1541    /// ```
1542    ///
1543    /// ```
1544    /// use arrow::datatypes::{Field, Schema, DataType};
1545    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1546    /// let schema = Schema::new(vec![
1547    ///     Field::new("id", DataType::Int32, false),
1548    /// ]);
1549    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1550    ///     .filter(col("id").eq(lit(5))).unwrap()
1551    ///     .build().unwrap();
1552    ///
1553    /// // Format using display_indent
1554    /// let display_string = format!("{}", plan.display_indent());
1555    ///
1556    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1",
1557    ///             display_string);
1558    /// ```
1559    pub fn display_indent(&self) -> impl Display + '_ {
1560        // Boilerplate structure to wrap LogicalPlan with something
1561        // that that can be formatted
1562        struct Wrapper<'a>(&'a LogicalPlan);
1563        impl Display for Wrapper<'_> {
1564            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1565                let with_schema = false;
1566                let mut visitor = IndentVisitor::new(f, with_schema);
1567                match self.0.visit_with_subqueries(&mut visitor) {
1568                    Ok(_) => Ok(()),
1569                    Err(_) => Err(fmt::Error),
1570                }
1571            }
1572        }
1573        Wrapper(self)
1574    }
1575
1576    /// Return a `format`able structure that produces a single line
1577    /// per node that includes the output schema. For example:
1578    ///
1579    /// ```text
1580    /// Projection: employee.id [id:Int32]\
1581    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1582    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1583    /// ```
1584    ///
1585    /// ```
1586    /// use arrow::datatypes::{Field, Schema, DataType};
1587    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1588    /// let schema = Schema::new(vec![
1589    ///     Field::new("id", DataType::Int32, false),
1590    /// ]);
1591    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1592    ///     .filter(col("id").eq(lit(5))).unwrap()
1593    ///     .build().unwrap();
1594    ///
1595    /// // Format using display_indent_schema
1596    /// let display_string = format!("{}", plan.display_indent_schema());
1597    ///
1598    /// assert_eq!("Filter: t1.id = Int32(5) [id:Int32]\
1599    ///             \n  TableScan: t1 [id:Int32]",
1600    ///             display_string);
1601    /// ```
1602    pub fn display_indent_schema(&self) -> impl Display + '_ {
1603        // Boilerplate structure to wrap LogicalPlan with something
1604        // that that can be formatted
1605        struct Wrapper<'a>(&'a LogicalPlan);
1606        impl Display for Wrapper<'_> {
1607            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1608                let with_schema = true;
1609                let mut visitor = IndentVisitor::new(f, with_schema);
1610                match self.0.visit_with_subqueries(&mut visitor) {
1611                    Ok(_) => Ok(()),
1612                    Err(_) => Err(fmt::Error),
1613                }
1614            }
1615        }
1616        Wrapper(self)
1617    }
1618
1619    /// Return a displayable structure that produces plan in postgresql JSON format.
1620    ///
1621    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1622    pub fn display_pg_json(&self) -> impl Display + '_ {
1623        // Boilerplate structure to wrap LogicalPlan with something
1624        // that that can be formatted
1625        struct Wrapper<'a>(&'a LogicalPlan);
1626        impl Display for Wrapper<'_> {
1627            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1628                let mut visitor = PgJsonVisitor::new(f);
1629                visitor.with_schema(true);
1630                match self.0.visit_with_subqueries(&mut visitor) {
1631                    Ok(_) => Ok(()),
1632                    Err(_) => Err(fmt::Error),
1633                }
1634            }
1635        }
1636        Wrapper(self)
1637    }
1638
1639    /// Return a `format`able structure that produces lines meant for
1640    /// graphical display using the `DOT` language. This format can be
1641    /// visualized using software from
1642    /// [`graphviz`](https://graphviz.org/)
1643    ///
1644    /// This currently produces two graphs -- one with the basic
1645    /// structure, and one with additional details such as schema.
1646    ///
1647    /// ```
1648    /// use arrow::datatypes::{Field, Schema, DataType};
1649    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1650    /// let schema = Schema::new(vec![
1651    ///     Field::new("id", DataType::Int32, false),
1652    /// ]);
1653    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1654    ///     .filter(col("id").eq(lit(5))).unwrap()
1655    ///     .build().unwrap();
1656    ///
1657    /// // Format using display_graphviz
1658    /// let graphviz_string = format!("{}", plan.display_graphviz());
1659    /// ```
1660    ///
1661    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1662    /// commands can be used to render it as a pdf:
1663    ///
1664    /// ```bash
1665    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1666    /// ```
1667    ///
1668    pub fn display_graphviz(&self) -> impl Display + '_ {
1669        // Boilerplate structure to wrap LogicalPlan with something
1670        // that that can be formatted
1671        struct Wrapper<'a>(&'a LogicalPlan);
1672        impl Display for Wrapper<'_> {
1673            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1674                let mut visitor = GraphvizVisitor::new(f);
1675
1676                visitor.start_graph()?;
1677
1678                visitor.pre_visit_plan("LogicalPlan")?;
1679                self.0
1680                    .visit_with_subqueries(&mut visitor)
1681                    .map_err(|_| fmt::Error)?;
1682                visitor.post_visit_plan()?;
1683
1684                visitor.set_with_schema(true);
1685                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1686                self.0
1687                    .visit_with_subqueries(&mut visitor)
1688                    .map_err(|_| fmt::Error)?;
1689                visitor.post_visit_plan()?;
1690
1691                visitor.end_graph()?;
1692                Ok(())
1693            }
1694        }
1695        Wrapper(self)
1696    }
1697
1698    /// Return a `format`able structure with the a human readable
1699    /// description of this LogicalPlan node per node, not including
1700    /// children. For example:
1701    ///
1702    /// ```text
1703    /// Projection: id
1704    /// ```
1705    /// ```
1706    /// use arrow::datatypes::{Field, Schema, DataType};
1707    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1708    /// let schema = Schema::new(vec![
1709    ///     Field::new("id", DataType::Int32, false),
1710    /// ]);
1711    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1712    ///     .build().unwrap();
1713    ///
1714    /// // Format using display
1715    /// let display_string = format!("{}", plan.display());
1716    ///
1717    /// assert_eq!("TableScan: t1", display_string);
1718    /// ```
1719    pub fn display(&self) -> impl Display + '_ {
1720        // Boilerplate structure to wrap LogicalPlan with something
1721        // that that can be formatted
1722        struct Wrapper<'a>(&'a LogicalPlan);
1723        impl Display for Wrapper<'_> {
1724            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1725                match self.0 {
1726                    LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: _ }) => {
1727                        let rows = if *produce_one_row { 1 } else { 0 };
1728                        write!(f, "EmptyRelation: rows={rows}")
1729                    },
1730                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1731                        is_distinct, ..
1732                    }) => {
1733                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1734                    }
1735                    LogicalPlan::Values(Values { ref values, .. }) => {
1736                        let str_values: Vec<_> = values
1737                            .iter()
1738                            // limit to only 5 values to avoid horrible display
1739                            .take(5)
1740                            .map(|row| {
1741                                let item = row
1742                                    .iter()
1743                                    .map(|expr| expr.to_string())
1744                                    .collect::<Vec<_>>()
1745                                    .join(", ");
1746                                format!("({item})")
1747                            })
1748                            .collect();
1749
1750                        let eclipse = if values.len() > 5 { "..." } else { "" };
1751                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1752                    }
1753
1754                    LogicalPlan::TableScan(TableScan {
1755                        ref source,
1756                        ref table_name,
1757                        ref projection,
1758                        ref filters,
1759                        ref fetch,
1760                        ..
1761                    }) => {
1762                        let projected_fields = match projection {
1763                            Some(indices) => {
1764                                let schema = source.schema();
1765                                let names: Vec<&str> = indices
1766                                    .iter()
1767                                    .map(|i| schema.field(*i).name().as_str())
1768                                    .collect();
1769                                format!(" projection=[{}]", names.join(", "))
1770                            }
1771                            _ => "".to_string(),
1772                        };
1773
1774                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1775
1776                        if !filters.is_empty() {
1777                            let mut full_filter = vec![];
1778                            let mut partial_filter = vec![];
1779                            let mut unsupported_filters = vec![];
1780                            let filters: Vec<&Expr> = filters.iter().collect();
1781
1782                            if let Ok(results) =
1783                                source.supports_filters_pushdown(&filters)
1784                            {
1785                                filters.iter().zip(results.iter()).for_each(
1786                                    |(x, res)| match res {
1787                                        TableProviderFilterPushDown::Exact => {
1788                                            full_filter.push(x)
1789                                        }
1790                                        TableProviderFilterPushDown::Inexact => {
1791                                            partial_filter.push(x)
1792                                        }
1793                                        TableProviderFilterPushDown::Unsupported => {
1794                                            unsupported_filters.push(x)
1795                                        }
1796                                    },
1797                                );
1798                            }
1799
1800                            if !full_filter.is_empty() {
1801                                write!(
1802                                    f,
1803                                    ", full_filters=[{}]",
1804                                    expr_vec_fmt!(full_filter)
1805                                )?;
1806                            };
1807                            if !partial_filter.is_empty() {
1808                                write!(
1809                                    f,
1810                                    ", partial_filters=[{}]",
1811                                    expr_vec_fmt!(partial_filter)
1812                                )?;
1813                            }
1814                            if !unsupported_filters.is_empty() {
1815                                write!(
1816                                    f,
1817                                    ", unsupported_filters=[{}]",
1818                                    expr_vec_fmt!(unsupported_filters)
1819                                )?;
1820                            }
1821                        }
1822
1823                        if let Some(n) = fetch {
1824                            write!(f, ", fetch={n}")?;
1825                        }
1826
1827                        Ok(())
1828                    }
1829                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
1830                        write!(f, "Projection:")?;
1831                        for (i, expr_item) in expr.iter().enumerate() {
1832                            if i > 0 {
1833                                write!(f, ",")?;
1834                            }
1835                            write!(f, " {expr_item}")?;
1836                        }
1837                        Ok(())
1838                    }
1839                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1840                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1841                    }
1842                    LogicalPlan::Copy(CopyTo {
1843                        input: _,
1844                        output_url,
1845                        file_type,
1846                        options,
1847                        ..
1848                    }) => {
1849                        let op_str = options
1850                            .iter()
1851                            .map(|(k, v)| format!("{k} {v}"))
1852                            .collect::<Vec<String>>()
1853                            .join(", ");
1854
1855                        write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1856                    }
1857                    LogicalPlan::Ddl(ddl) => {
1858                        write!(f, "{}", ddl.display())
1859                    }
1860                    LogicalPlan::Filter(Filter {
1861                        predicate: ref expr,
1862                        ..
1863                    }) => write!(f, "Filter: {expr}"),
1864                    LogicalPlan::Window(Window {
1865                        ref window_expr, ..
1866                    }) => {
1867                        write!(
1868                            f,
1869                            "WindowAggr: windowExpr=[[{}]]",
1870                            expr_vec_fmt!(window_expr)
1871                        )
1872                    }
1873                    LogicalPlan::Aggregate(Aggregate {
1874                        ref group_expr,
1875                        ref aggr_expr,
1876                        ..
1877                    }) => write!(
1878                        f,
1879                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1880                        expr_vec_fmt!(group_expr),
1881                        expr_vec_fmt!(aggr_expr)
1882                    ),
1883                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1884                        write!(f, "Sort: ")?;
1885                        for (i, expr_item) in expr.iter().enumerate() {
1886                            if i > 0 {
1887                                write!(f, ", ")?;
1888                            }
1889                            write!(f, "{expr_item}")?;
1890                        }
1891                        if let Some(a) = fetch {
1892                            write!(f, ", fetch={a}")?;
1893                        }
1894
1895                        Ok(())
1896                    }
1897                    LogicalPlan::Join(Join {
1898                        on: ref keys,
1899                        filter,
1900                        join_constraint,
1901                        join_type,
1902                        ..
1903                    }) => {
1904                        let join_expr: Vec<String> =
1905                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1906                        let filter_expr = filter
1907                            .as_ref()
1908                            .map(|expr| format!(" Filter: {expr}"))
1909                            .unwrap_or_else(|| "".to_string());
1910                        let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1911                            "Cross".to_string()
1912                        } else {
1913                            join_type.to_string()
1914                        };
1915                        match join_constraint {
1916                            JoinConstraint::On => {
1917                                write!(
1918                                    f,
1919                                    "{} Join: {}{}",
1920                                    join_type,
1921                                    join_expr.join(", "),
1922                                    filter_expr
1923                                )
1924                            }
1925                            JoinConstraint::Using => {
1926                                write!(
1927                                    f,
1928                                    "{} Join: Using {}{}",
1929                                    join_type,
1930                                    join_expr.join(", "),
1931                                    filter_expr,
1932                                )
1933                            }
1934                        }
1935                    }
1936                    LogicalPlan::Repartition(Repartition {
1937                        partitioning_scheme,
1938                        ..
1939                    }) => match partitioning_scheme {
1940                        Partitioning::RoundRobinBatch(n) => {
1941                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1942                        }
1943                        Partitioning::Hash(expr, n) => {
1944                            let hash_expr: Vec<String> =
1945                                expr.iter().map(|e| format!("{e}")).collect();
1946                            write!(
1947                                f,
1948                                "Repartition: Hash({}) partition_count={}",
1949                                hash_expr.join(", "),
1950                                n
1951                            )
1952                        }
1953                        Partitioning::DistributeBy(expr) => {
1954                            let dist_by_expr: Vec<String> =
1955                                expr.iter().map(|e| format!("{e}")).collect();
1956                            write!(
1957                                f,
1958                                "Repartition: DistributeBy({})",
1959                                dist_by_expr.join(", "),
1960                            )
1961                        }
1962                    },
1963                    LogicalPlan::Limit(limit) => {
1964                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
1965                        let skip_str = match limit.get_skip_type() {
1966                            Ok(SkipType::Literal(n)) => n.to_string(),
1967                            _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1968                        };
1969                        let fetch_str = match limit.get_fetch_type() {
1970                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
1971                            Ok(FetchType::Literal(None)) => "None".to_string(),
1972                            _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1973                        };
1974                        write!(
1975                            f,
1976                            "Limit: skip={skip_str}, fetch={fetch_str}",
1977                        )
1978                    }
1979                    LogicalPlan::Subquery(Subquery { .. }) => {
1980                        write!(f, "Subquery:")
1981                    }
1982                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1983                        write!(f, "SubqueryAlias: {alias}")
1984                    }
1985                    LogicalPlan::Statement(statement) => {
1986                        write!(f, "{}", statement.display())
1987                    }
1988                    LogicalPlan::Distinct(distinct) => match distinct {
1989                        Distinct::All(_) => write!(f, "Distinct:"),
1990                        Distinct::On(DistinctOn {
1991                            on_expr,
1992                            select_expr,
1993                            sort_expr,
1994                            ..
1995                        }) => write!(
1996                            f,
1997                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1998                            expr_vec_fmt!(on_expr),
1999                            expr_vec_fmt!(select_expr),
2000                            if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
2001                        ),
2002                    },
2003                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2004                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2005                    LogicalPlan::Union(_) => write!(f, "Union"),
2006                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2007                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2008                        write!(f, "DescribeTable")
2009                    }
2010                    LogicalPlan::Unnest(Unnest {
2011                        input: plan,
2012                        list_type_columns: list_col_indices,
2013                        struct_type_columns: struct_col_indices, .. }) => {
2014                        let input_columns = plan.schema().columns();
2015                        let list_type_columns = list_col_indices
2016                            .iter()
2017                            .map(|(i,unnest_info)|
2018                                format!("{}|depth={}", &input_columns[*i].to_string(),
2019                                unnest_info.depth))
2020                            .collect::<Vec<String>>();
2021                        let struct_type_columns = struct_col_indices
2022                            .iter()
2023                            .map(|i| &input_columns[*i])
2024                            .collect::<Vec<&Column>>();
2025                        // get items from input_columns indexed by list_col_indices
2026                        write!(f, "Unnest: lists[{}] structs[{}]",
2027                        expr_vec_fmt!(list_type_columns),
2028                        expr_vec_fmt!(struct_type_columns))
2029                    }
2030                }
2031            }
2032        }
2033        Wrapper(self)
2034    }
2035}
2036
2037impl Display for LogicalPlan {
2038    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2039        self.display_indent().fmt(f)
2040    }
2041}
2042
2043impl ToStringifiedPlan for LogicalPlan {
2044    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2045        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2046    }
2047}
2048
2049/// Relationship produces 0 or 1 placeholder rows with specified output schema
2050/// In most cases the output schema for `EmptyRelation` would be empty,
2051/// however, it can be non-empty typically for optimizer rules
2052#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2053pub struct EmptyRelation {
2054    /// Whether to produce a placeholder row
2055    pub produce_one_row: bool,
2056    /// The schema description of the output
2057    pub schema: DFSchemaRef,
2058}
2059
2060// Manual implementation needed because of `schema` field. Comparison excludes this field.
2061impl PartialOrd for EmptyRelation {
2062    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2063        self.produce_one_row.partial_cmp(&other.produce_one_row)
2064    }
2065}
2066
2067/// A variadic query operation, Recursive CTE.
2068///
2069/// # Recursive Query Evaluation
2070///
2071/// From the [Postgres Docs]:
2072///
2073/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2074///    discard duplicate rows. Include all remaining rows in the result of the
2075///    recursive query, and also place them in a temporary working table.
2076///
2077/// 2. So long as the working table is not empty, repeat these steps:
2078///
2079/// * Evaluate the recursive term, substituting the current contents of the
2080///   working table for the recursive self-reference. For `UNION` (but not `UNION
2081///   ALL`), discard duplicate rows and rows that duplicate any previous result
2082///   row. Include all remaining rows in the result of the recursive query, and
2083///   also place them in a temporary intermediate table.
2084///
2085/// * Replace the contents of the working table with the contents of the
2086///   intermediate table, then empty the intermediate table.
2087///
2088/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2089#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2090pub struct RecursiveQuery {
2091    /// Name of the query
2092    pub name: String,
2093    /// The static term (initial contents of the working table)
2094    pub static_term: Arc<LogicalPlan>,
2095    /// The recursive term (evaluated on the contents of the working table until
2096    /// it returns an empty set)
2097    pub recursive_term: Arc<LogicalPlan>,
2098    /// Should the output of the recursive term be deduplicated (`UNION`) or
2099    /// not (`UNION ALL`).
2100    pub is_distinct: bool,
2101}
2102
2103/// Values expression. See
2104/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2105/// documentation for more details.
2106#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2107pub struct Values {
2108    /// The table schema
2109    pub schema: DFSchemaRef,
2110    /// Values
2111    pub values: Vec<Vec<Expr>>,
2112}
2113
2114// Manual implementation needed because of `schema` field. Comparison excludes this field.
2115impl PartialOrd for Values {
2116    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2117        self.values.partial_cmp(&other.values)
2118    }
2119}
2120
2121/// Evaluates an arbitrary list of expressions (essentially a
2122/// SELECT with an expression list) on its input.
2123#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2124// mark non_exhaustive to encourage use of try_new/new()
2125#[non_exhaustive]
2126pub struct Projection {
2127    /// The list of expressions
2128    pub expr: Vec<Expr>,
2129    /// The incoming logical plan
2130    pub input: Arc<LogicalPlan>,
2131    /// The schema description of the output
2132    pub schema: DFSchemaRef,
2133}
2134
2135// Manual implementation needed because of `schema` field. Comparison excludes this field.
2136impl PartialOrd for Projection {
2137    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2138        match self.expr.partial_cmp(&other.expr) {
2139            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2140            cmp => cmp,
2141        }
2142    }
2143}
2144
2145impl Projection {
2146    /// Create a new Projection
2147    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2148        let projection_schema = projection_schema(&input, &expr)?;
2149        Self::try_new_with_schema(expr, input, projection_schema)
2150    }
2151
2152    /// Create a new Projection using the specified output schema
2153    pub fn try_new_with_schema(
2154        expr: Vec<Expr>,
2155        input: Arc<LogicalPlan>,
2156        schema: DFSchemaRef,
2157    ) -> Result<Self> {
2158        #[expect(deprecated)]
2159        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2160            && expr.len() != schema.fields().len()
2161        {
2162            return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2163        }
2164        Ok(Self {
2165            expr,
2166            input,
2167            schema,
2168        })
2169    }
2170
2171    /// Create a new Projection using the specified output schema
2172    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2173        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2174        Self {
2175            expr,
2176            input,
2177            schema,
2178        }
2179    }
2180}
2181
2182/// Computes the schema of the result produced by applying a projection to the input logical plan.
2183///
2184/// # Arguments
2185///
2186/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2187///   will be computed.
2188/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2189///
2190/// # Returns
2191///
2192/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2193/// produced by the projection operation. If the schema computation is successful,
2194/// the `Result` will contain the schema; otherwise, it will contain an error.
2195pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2196    let metadata = input.schema().metadata().clone();
2197
2198    let schema =
2199        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2200            .with_functional_dependencies(calc_func_dependencies_for_project(
2201                exprs, input,
2202            )?)?;
2203
2204    Ok(Arc::new(schema))
2205}
2206
2207/// Aliased subquery
2208#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2209// mark non_exhaustive to encourage use of try_new/new()
2210#[non_exhaustive]
2211pub struct SubqueryAlias {
2212    /// The incoming logical plan
2213    pub input: Arc<LogicalPlan>,
2214    /// The alias for the input relation
2215    pub alias: TableReference,
2216    /// The schema with qualified field names
2217    pub schema: DFSchemaRef,
2218}
2219
2220impl SubqueryAlias {
2221    pub fn try_new(
2222        plan: Arc<LogicalPlan>,
2223        alias: impl Into<TableReference>,
2224    ) -> Result<Self> {
2225        let alias = alias.into();
2226
2227        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2228        // no field must share the same column name as this would lead to ambiguity when referencing
2229        // columns in parent logical nodes.
2230
2231        // Compute unique aliases, if any, for each column of the input's schema.
2232        let aliases = unique_field_aliases(plan.schema().fields());
2233        let is_projection_needed = aliases.iter().any(Option::is_some);
2234
2235        // Insert a projection node, if needed, to make sure aliases are applied.
2236        let plan = if is_projection_needed {
2237            let projection_expressions = aliases
2238                .iter()
2239                .zip(plan.schema().iter())
2240                .map(|(alias, (qualifier, field))| {
2241                    let column =
2242                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2243                    match alias {
2244                        None => column,
2245                        Some(alias) => {
2246                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2247                        }
2248                    }
2249                })
2250                .collect();
2251            let projection = Projection::try_new(projection_expressions, plan)?;
2252            Arc::new(LogicalPlan::Projection(projection))
2253        } else {
2254            plan
2255        };
2256
2257        // Requalify fields with the new `alias`.
2258        let fields = plan.schema().fields().clone();
2259        let meta_data = plan.schema().metadata().clone();
2260        let func_dependencies = plan.schema().functional_dependencies().clone();
2261
2262        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2263        let schema = Schema::from(schema);
2264
2265        let schema = DFSchemaRef::new(
2266            DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2267                .with_functional_dependencies(func_dependencies)?,
2268        );
2269        Ok(SubqueryAlias {
2270            input: plan,
2271            alias,
2272            schema,
2273        })
2274    }
2275}
2276
2277// Manual implementation needed because of `schema` field. Comparison excludes this field.
2278impl PartialOrd for SubqueryAlias {
2279    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2280        match self.input.partial_cmp(&other.input) {
2281            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2282            cmp => cmp,
2283        }
2284    }
2285}
2286
2287/// Filters rows from its input that do not match an
2288/// expression (essentially a WHERE clause with a predicate
2289/// expression).
2290///
2291/// Semantically, `<predicate>` is evaluated for each row of the input;
2292/// If the value of `<predicate>` is true, the input row is passed to
2293/// the output. If the value of `<predicate>` is false, the row is
2294/// discarded.
2295///
2296/// Filter should not be created directly but instead use `try_new()`
2297/// and that these fields are only pub to support pattern matching
2298#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2299#[non_exhaustive]
2300pub struct Filter {
2301    /// The predicate expression, which must have Boolean type.
2302    pub predicate: Expr,
2303    /// The incoming logical plan
2304    pub input: Arc<LogicalPlan>,
2305}
2306
2307impl Filter {
2308    /// Create a new filter operator.
2309    ///
2310    /// Notes: as Aliases have no effect on the output of a filter operator,
2311    /// they are removed from the predicate expression.
2312    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2313        Self::try_new_internal(predicate, input)
2314    }
2315
2316    /// Create a new filter operator for a having clause.
2317    /// This is similar to a filter, but its having flag is set to true.
2318    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2319    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2320        Self::try_new_internal(predicate, input)
2321    }
2322
2323    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2324        match data_type {
2325            // Interpret NULL as a missing boolean value.
2326            DataType::Boolean | DataType::Null => true,
2327            DataType::Dictionary(_, value_type) => {
2328                Filter::is_allowed_filter_type(value_type.as_ref())
2329            }
2330            _ => false,
2331        }
2332    }
2333
2334    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2335        // Filter predicates must return a boolean value so we try and validate that here.
2336        // Note that it is not always possible to resolve the predicate expression during plan
2337        // construction (such as with correlated subqueries) so we make a best effort here and
2338        // ignore errors resolving the expression against the schema.
2339        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2340            if !Filter::is_allowed_filter_type(&predicate_type) {
2341                return plan_err!(
2342                    "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2343                );
2344            }
2345        }
2346
2347        Ok(Self {
2348            predicate: predicate.unalias_nested().data,
2349            input,
2350        })
2351    }
2352
2353    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2354    ///
2355    /// This function will return `true` if its predicate contains a conjunction of
2356    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2357    /// by this conjunction.
2358    ///
2359    /// For example, for the table:
2360    /// ```sql
2361    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2362    /// ```
2363    /// `Filter(a = 2).is_scalar() == true`
2364    /// , whereas
2365    /// `Filter(b = 2).is_scalar() == false`
2366    /// and
2367    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2368    fn is_scalar(&self) -> bool {
2369        let schema = self.input.schema();
2370
2371        let functional_dependencies = self.input.schema().functional_dependencies();
2372        let unique_keys = functional_dependencies.iter().filter(|dep| {
2373            let nullable = dep.nullable
2374                && dep
2375                    .source_indices
2376                    .iter()
2377                    .any(|&source| schema.field(source).is_nullable());
2378            !nullable
2379                && dep.mode == Dependency::Single
2380                && dep.target_indices.len() == schema.fields().len()
2381        });
2382
2383        let exprs = split_conjunction(&self.predicate);
2384        let eq_pred_cols: HashSet<_> = exprs
2385            .iter()
2386            .filter_map(|expr| {
2387                let Expr::BinaryExpr(BinaryExpr {
2388                    left,
2389                    op: Operator::Eq,
2390                    right,
2391                }) = expr
2392                else {
2393                    return None;
2394                };
2395                // This is a no-op filter expression
2396                if left == right {
2397                    return None;
2398                }
2399
2400                match (left.as_ref(), right.as_ref()) {
2401                    (Expr::Column(_), Expr::Column(_)) => None,
2402                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2403                        Some(schema.index_of_column(c).unwrap())
2404                    }
2405                    _ => None,
2406                }
2407            })
2408            .collect();
2409
2410        // If we have a functional dependence that is a subset of our predicate,
2411        // this filter is scalar
2412        for key in unique_keys {
2413            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2414                return true;
2415            }
2416        }
2417        false
2418    }
2419}
2420
2421/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2422///
2423/// # Output Schema
2424///
2425/// The output schema is the input schema followed by the window function
2426/// expressions, in order.
2427///
2428/// For example, given the input schema `"A", "B", "C"` and the window function
2429/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2430/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2431/// output column.
2432///
2433/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2434/// schema.
2435#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2436pub struct Window {
2437    /// The incoming logical plan
2438    pub input: Arc<LogicalPlan>,
2439    /// The window function expression
2440    pub window_expr: Vec<Expr>,
2441    /// The schema description of the window output
2442    pub schema: DFSchemaRef,
2443}
2444
2445impl Window {
2446    /// Create a new window operator.
2447    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2448        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2449            .schema()
2450            .iter()
2451            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2452            .collect();
2453        let input_len = fields.len();
2454        let mut window_fields = fields;
2455        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2456        window_fields.extend_from_slice(expr_fields.as_slice());
2457        let metadata = input.schema().metadata().clone();
2458
2459        // Update functional dependencies for window:
2460        let mut window_func_dependencies =
2461            input.schema().functional_dependencies().clone();
2462        window_func_dependencies.extend_target_indices(window_fields.len());
2463
2464        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2465        // of consecutive numbers per partition), we can represent this fact with
2466        // functional dependencies.
2467        let mut new_dependencies = window_expr
2468            .iter()
2469            .enumerate()
2470            .filter_map(|(idx, expr)| {
2471                let Expr::WindowFunction(window_fun) = expr else {
2472                    return None;
2473                };
2474                let WindowFunction {
2475                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2476                    params: WindowFunctionParams { partition_by, .. },
2477                } = window_fun.as_ref()
2478                else {
2479                    return None;
2480                };
2481                // When there is no PARTITION BY, row number will be unique
2482                // across the entire table.
2483                if udwf.name() == "row_number" && partition_by.is_empty() {
2484                    Some(idx + input_len)
2485                } else {
2486                    None
2487                }
2488            })
2489            .map(|idx| {
2490                FunctionalDependence::new(vec![idx], vec![], false)
2491                    .with_mode(Dependency::Single)
2492            })
2493            .collect::<Vec<_>>();
2494
2495        if !new_dependencies.is_empty() {
2496            for dependence in new_dependencies.iter_mut() {
2497                dependence.target_indices = (0..window_fields.len()).collect();
2498            }
2499            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2500            let new_deps = FunctionalDependencies::new(new_dependencies);
2501            window_func_dependencies.extend(new_deps);
2502        }
2503
2504        // Validate that FILTER clauses are only used with aggregate window functions
2505        if let Some(e) = window_expr.iter().find(|e| {
2506            matches!(
2507                e,
2508                Expr::WindowFunction(wf)
2509                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2510                        && wf.params.filter.is_some()
2511            )
2512        }) {
2513            return plan_err!(
2514                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2515            );
2516        }
2517
2518        Self::try_new_with_schema(
2519            window_expr,
2520            input,
2521            Arc::new(
2522                DFSchema::new_with_metadata(window_fields, metadata)?
2523                    .with_functional_dependencies(window_func_dependencies)?,
2524            ),
2525        )
2526    }
2527
2528    pub fn try_new_with_schema(
2529        window_expr: Vec<Expr>,
2530        input: Arc<LogicalPlan>,
2531        schema: DFSchemaRef,
2532    ) -> Result<Self> {
2533        if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2534            return plan_err!(
2535                "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2536                window_expr.len(),
2537                schema.fields().len() - input.schema().fields().len()
2538            );
2539        }
2540
2541        Ok(Window {
2542            input,
2543            window_expr,
2544            schema,
2545        })
2546    }
2547}
2548
2549// Manual implementation needed because of `schema` field. Comparison excludes this field.
2550impl PartialOrd for Window {
2551    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2552        match self.input.partial_cmp(&other.input)? {
2553            Ordering::Equal => {} // continue
2554            not_equal => return Some(not_equal),
2555        }
2556
2557        match self.window_expr.partial_cmp(&other.window_expr)? {
2558            Ordering::Equal => {} // continue
2559            not_equal => return Some(not_equal),
2560        }
2561
2562        // Contract for PartialOrd and PartialEq consistency requires that
2563        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2564        if self == other {
2565            Some(Ordering::Equal)
2566        } else {
2567            None
2568        }
2569    }
2570}
2571
2572/// Produces rows from a table provider by reference or from the context
2573#[derive(Clone)]
2574pub struct TableScan {
2575    /// The name of the table
2576    pub table_name: TableReference,
2577    /// The source of the table
2578    pub source: Arc<dyn TableSource>,
2579    /// Optional column indices to use as a projection
2580    pub projection: Option<Vec<usize>>,
2581    /// The schema description of the output
2582    pub projected_schema: DFSchemaRef,
2583    /// Optional expressions to be used as filters by the table provider
2584    pub filters: Vec<Expr>,
2585    /// Optional number of rows to read
2586    pub fetch: Option<usize>,
2587}
2588
2589impl Debug for TableScan {
2590    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2591        f.debug_struct("TableScan")
2592            .field("table_name", &self.table_name)
2593            .field("source", &"...")
2594            .field("projection", &self.projection)
2595            .field("projected_schema", &self.projected_schema)
2596            .field("filters", &self.filters)
2597            .field("fetch", &self.fetch)
2598            .finish_non_exhaustive()
2599    }
2600}
2601
2602impl PartialEq for TableScan {
2603    fn eq(&self, other: &Self) -> bool {
2604        self.table_name == other.table_name
2605            && self.projection == other.projection
2606            && self.projected_schema == other.projected_schema
2607            && self.filters == other.filters
2608            && self.fetch == other.fetch
2609    }
2610}
2611
2612impl Eq for TableScan {}
2613
2614// Manual implementation needed because of `source` and `projected_schema` fields.
2615// Comparison excludes these field.
2616impl PartialOrd for TableScan {
2617    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2618        #[derive(PartialEq, PartialOrd)]
2619        struct ComparableTableScan<'a> {
2620            /// The name of the table
2621            pub table_name: &'a TableReference,
2622            /// Optional column indices to use as a projection
2623            pub projection: &'a Option<Vec<usize>>,
2624            /// Optional expressions to be used as filters by the table provider
2625            pub filters: &'a Vec<Expr>,
2626            /// Optional number of rows to read
2627            pub fetch: &'a Option<usize>,
2628        }
2629        let comparable_self = ComparableTableScan {
2630            table_name: &self.table_name,
2631            projection: &self.projection,
2632            filters: &self.filters,
2633            fetch: &self.fetch,
2634        };
2635        let comparable_other = ComparableTableScan {
2636            table_name: &other.table_name,
2637            projection: &other.projection,
2638            filters: &other.filters,
2639            fetch: &other.fetch,
2640        };
2641        comparable_self.partial_cmp(&comparable_other)
2642    }
2643}
2644
2645impl Hash for TableScan {
2646    fn hash<H: Hasher>(&self, state: &mut H) {
2647        self.table_name.hash(state);
2648        self.projection.hash(state);
2649        self.projected_schema.hash(state);
2650        self.filters.hash(state);
2651        self.fetch.hash(state);
2652    }
2653}
2654
2655impl TableScan {
2656    /// Initialize TableScan with appropriate schema from the given
2657    /// arguments.
2658    pub fn try_new(
2659        table_name: impl Into<TableReference>,
2660        table_source: Arc<dyn TableSource>,
2661        projection: Option<Vec<usize>>,
2662        filters: Vec<Expr>,
2663        fetch: Option<usize>,
2664    ) -> Result<Self> {
2665        let table_name = table_name.into();
2666
2667        if table_name.table().is_empty() {
2668            return plan_err!("table_name cannot be empty");
2669        }
2670        let schema = table_source.schema();
2671        let func_dependencies = FunctionalDependencies::new_from_constraints(
2672            table_source.constraints(),
2673            schema.fields.len(),
2674        );
2675        let projected_schema = projection
2676            .as_ref()
2677            .map(|p| {
2678                let projected_func_dependencies =
2679                    func_dependencies.project_functional_dependencies(p, p.len());
2680
2681                let df_schema = DFSchema::new_with_metadata(
2682                    p.iter()
2683                        .map(|i| {
2684                            (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2685                        })
2686                        .collect(),
2687                    schema.metadata.clone(),
2688                )?;
2689                df_schema.with_functional_dependencies(projected_func_dependencies)
2690            })
2691            .unwrap_or_else(|| {
2692                let df_schema =
2693                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2694                df_schema.with_functional_dependencies(func_dependencies)
2695            })?;
2696        let projected_schema = Arc::new(projected_schema);
2697
2698        Ok(Self {
2699            table_name,
2700            source: table_source,
2701            projection,
2702            projected_schema,
2703            filters,
2704            fetch,
2705        })
2706    }
2707}
2708
2709// Repartition the plan based on a partitioning scheme.
2710#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2711pub struct Repartition {
2712    /// The incoming logical plan
2713    pub input: Arc<LogicalPlan>,
2714    /// The partitioning scheme
2715    pub partitioning_scheme: Partitioning,
2716}
2717
2718/// Union multiple inputs
2719#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2720pub struct Union {
2721    /// Inputs to merge
2722    pub inputs: Vec<Arc<LogicalPlan>>,
2723    /// Union schema. Should be the same for all inputs.
2724    pub schema: DFSchemaRef,
2725}
2726
2727impl Union {
2728    /// Constructs new Union instance deriving schema from inputs.
2729    fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2730        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2731        Ok(Union { inputs, schema })
2732    }
2733
2734    /// Constructs new Union instance deriving schema from inputs.
2735    /// Inputs do not have to have matching types and produced schema will
2736    /// take type from the first input.
2737    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2738    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2739        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2740        Ok(Union { inputs, schema })
2741    }
2742
2743    /// Constructs a new Union instance that combines rows from different tables by name,
2744    /// instead of by position. This means that the specified inputs need not have schemas
2745    /// that are all the same width.
2746    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2747        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2748        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2749
2750        Ok(Union { inputs, schema })
2751    }
2752
2753    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2754    /// in an additional `Projection` to account for absence of columns
2755    /// in input schemas or differing projection orders.
2756    fn rewrite_inputs_from_schema(
2757        schema: &Arc<DFSchema>,
2758        inputs: Vec<Arc<LogicalPlan>>,
2759    ) -> Result<Vec<Arc<LogicalPlan>>> {
2760        let schema_width = schema.iter().count();
2761        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2762        for input in inputs {
2763            // Any columns that exist within the derived schema but do not exist
2764            // within an input's schema should be replaced with `NULL` aliased
2765            // to the appropriate column in the derived schema.
2766            let mut expr = Vec::with_capacity(schema_width);
2767            for column in schema.columns() {
2768                if input
2769                    .schema()
2770                    .has_column_with_unqualified_name(column.name())
2771                {
2772                    expr.push(Expr::Column(column));
2773                } else {
2774                    expr.push(
2775                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2776                    );
2777                }
2778            }
2779            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2780                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2781            )));
2782        }
2783
2784        Ok(wrapped_inputs)
2785    }
2786
2787    /// Constructs new Union instance deriving schema from inputs.
2788    ///
2789    /// If `loose_types` is true, inputs do not need to have matching types and
2790    /// the produced schema will use the type from the first input.
2791    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2792    ///
2793    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2794    /// the constructed schema follows `UNION BY NAME` semantics.
2795    fn derive_schema_from_inputs(
2796        inputs: &[Arc<LogicalPlan>],
2797        loose_types: bool,
2798        by_name: bool,
2799    ) -> Result<DFSchemaRef> {
2800        if inputs.len() < 2 {
2801            return plan_err!("UNION requires at least two inputs");
2802        }
2803
2804        if by_name {
2805            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2806        } else {
2807            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2808        }
2809    }
2810
2811    fn derive_schema_from_inputs_by_name(
2812        inputs: &[Arc<LogicalPlan>],
2813        loose_types: bool,
2814    ) -> Result<DFSchemaRef> {
2815        type FieldData<'a> =
2816            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2817        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2818        for input in inputs.iter() {
2819            for field in input.schema().fields() {
2820                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2821                    cols.iter_mut().find(|(name, _)| name == field.name())
2822                {
2823                    if !loose_types && *data_type != field.data_type() {
2824                        return plan_err!(
2825                            "Found different types for field {}",
2826                            field.name()
2827                        );
2828                    }
2829
2830                    metadata.push(field.metadata());
2831                    // If the field is nullable in any one of the inputs,
2832                    // then the field in the final schema is also nullable.
2833                    *is_nullable |= field.is_nullable();
2834                    *occurrences += 1;
2835                } else {
2836                    cols.push((
2837                        field.name(),
2838                        (
2839                            field.data_type(),
2840                            field.is_nullable(),
2841                            vec![field.metadata()],
2842                            1,
2843                        ),
2844                    ));
2845                }
2846            }
2847        }
2848
2849        let union_fields = cols
2850            .into_iter()
2851            .map(
2852                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2853                    // If the final number of occurrences of the field is less
2854                    // than the number of inputs (i.e. the field is missing from
2855                    // one or more inputs), then it must be treated as nullable.
2856                    let final_is_nullable = if occurrences == inputs.len() {
2857                        is_nullable
2858                    } else {
2859                        true
2860                    };
2861
2862                    let mut field =
2863                        Field::new(name, data_type.clone(), final_is_nullable);
2864                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2865
2866                    (None, Arc::new(field))
2867                },
2868            )
2869            .collect::<Vec<(Option<TableReference>, _)>>();
2870
2871        let union_schema_metadata = intersect_metadata_for_union(
2872            inputs.iter().map(|input| input.schema().metadata()),
2873        );
2874
2875        // Functional Dependencies are not preserved after UNION operation
2876        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2877        let schema = Arc::new(schema);
2878
2879        Ok(schema)
2880    }
2881
2882    fn derive_schema_from_inputs_by_position(
2883        inputs: &[Arc<LogicalPlan>],
2884        loose_types: bool,
2885    ) -> Result<DFSchemaRef> {
2886        let first_schema = inputs[0].schema();
2887        let fields_count = first_schema.fields().len();
2888        for input in inputs.iter().skip(1) {
2889            if fields_count != input.schema().fields().len() {
2890                return plan_err!(
2891                    "UNION queries have different number of columns: \
2892                    left has {} columns whereas right has {} columns",
2893                    fields_count,
2894                    input.schema().fields().len()
2895                );
2896            }
2897        }
2898
2899        let mut name_counts: HashMap<String, usize> = HashMap::new();
2900        let union_fields = (0..fields_count)
2901            .map(|i| {
2902                let fields = inputs
2903                    .iter()
2904                    .map(|input| input.schema().field(i))
2905                    .collect::<Vec<_>>();
2906                let first_field = fields[0];
2907                let base_name = first_field.name().to_string();
2908
2909                let data_type = if loose_types {
2910                    // TODO apply type coercion here, or document why it's better to defer
2911                    // temporarily use the data type from the left input and later rely on the analyzer to
2912                    // coerce the two schemas into a common one.
2913                    first_field.data_type()
2914                } else {
2915                    fields.iter().skip(1).try_fold(
2916                        first_field.data_type(),
2917                        |acc, field| {
2918                            if acc != field.data_type() {
2919                                return plan_err!(
2920                                    "UNION field {i} have different type in inputs: \
2921                                    left has {} whereas right has {}",
2922                                    first_field.data_type(),
2923                                    field.data_type()
2924                                );
2925                            }
2926                            Ok(acc)
2927                        },
2928                    )?
2929                };
2930                let nullable = fields.iter().any(|field| field.is_nullable());
2931
2932                // Generate unique field name
2933                let name = if let Some(count) = name_counts.get_mut(&base_name) {
2934                    *count += 1;
2935                    format!("{base_name}_{count}")
2936                } else {
2937                    name_counts.insert(base_name.clone(), 0);
2938                    base_name
2939                };
2940
2941                let mut field = Field::new(&name, data_type.clone(), nullable);
2942                let field_metadata = intersect_metadata_for_union(
2943                    fields.iter().map(|field| field.metadata()),
2944                );
2945                field.set_metadata(field_metadata);
2946                Ok((None, Arc::new(field)))
2947            })
2948            .collect::<Result<_>>()?;
2949        let union_schema_metadata = intersect_metadata_for_union(
2950            inputs.iter().map(|input| input.schema().metadata()),
2951        );
2952
2953        // Functional Dependencies are not preserved after UNION operation
2954        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2955        let schema = Arc::new(schema);
2956
2957        Ok(schema)
2958    }
2959}
2960
2961// Manual implementation needed because of `schema` field. Comparison excludes this field.
2962impl PartialOrd for Union {
2963    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2964        self.inputs.partial_cmp(&other.inputs)
2965    }
2966}
2967
2968/// Describe the schema of table
2969///
2970/// # Example output:
2971///
2972/// ```sql
2973/// > describe traces;
2974/// +--------------------+-----------------------------+-------------+
2975/// | column_name        | data_type                   | is_nullable |
2976/// +--------------------+-----------------------------+-------------+
2977/// | attributes         | Utf8                        | YES         |
2978/// | duration_nano      | Int64                       | YES         |
2979/// | end_time_unix_nano | Int64                       | YES         |
2980/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
2981/// | span.kind          | Utf8                        | YES         |
2982/// | span.name          | Utf8                        | YES         |
2983/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
2984/// | time               | Timestamp(Nanosecond, None) | NO          |
2985/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
2986/// | otel.status_code   | Utf8                        | YES         |
2987/// | parent_span_id     | Utf8                        | YES         |
2988/// +--------------------+-----------------------------+-------------+
2989/// ```
2990#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2991pub struct DescribeTable {
2992    /// Table schema
2993    pub schema: Arc<Schema>,
2994    /// schema of describe table output
2995    pub output_schema: DFSchemaRef,
2996}
2997
2998// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
2999// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3000impl PartialOrd for DescribeTable {
3001    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3002        // There is no relevant comparison for schemas
3003        None
3004    }
3005}
3006
3007/// Output formats for controlling for Explain plans
3008#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3009pub enum ExplainFormat {
3010    /// Indent mode
3011    ///
3012    /// Example:
3013    /// ```text
3014    /// > explain format indent select x from values (1) t(x);
3015    /// +---------------+-----------------------------------------------------+
3016    /// | plan_type     | plan                                                |
3017    /// +---------------+-----------------------------------------------------+
3018    /// | logical_plan  | SubqueryAlias: t                                    |
3019    /// |               |   Projection: column1 AS x                          |
3020    /// |               |     Values: (Int64(1))                              |
3021    /// | physical_plan | ProjectionExec: expr=[column1@0 as x]               |
3022    /// |               |   DataSourceExec: partitions=1, partition_sizes=[1] |
3023    /// |               |                                                     |
3024    /// +---------------+-----------------------------------------------------+
3025    /// ```
3026    Indent,
3027    /// Tree mode
3028    ///
3029    /// Example:
3030    /// ```text
3031    /// > explain format tree select x from values (1) t(x);
3032    /// +---------------+-------------------------------+
3033    /// | plan_type     | plan                          |
3034    /// +---------------+-------------------------------+
3035    /// | physical_plan | ┌───────────────────────────┐ |
3036    /// |               | │       ProjectionExec      │ |
3037    /// |               | │    --------------------   │ |
3038    /// |               | │        x: column1@0       │ |
3039    /// |               | └─────────────┬─────────────┘ |
3040    /// |               | ┌─────────────┴─────────────┐ |
3041    /// |               | │       DataSourceExec      │ |
3042    /// |               | │    --------------------   │ |
3043    /// |               | │         bytes: 128        │ |
3044    /// |               | │       format: memory      │ |
3045    /// |               | │          rows: 1          │ |
3046    /// |               | └───────────────────────────┘ |
3047    /// |               |                               |
3048    /// +---------------+-------------------------------+
3049    /// ```
3050    Tree,
3051    /// Postgres Json mode
3052    ///
3053    /// A displayable structure that produces plan in postgresql JSON format.
3054    ///
3055    /// Users can use this format to visualize the plan in existing plan
3056    /// visualization tools, for example [dalibo](https://explain.dalibo.com/)
3057    ///
3058    /// Example:
3059    /// ```text
3060    /// > explain format pgjson select x from values (1) t(x);
3061    /// +--------------+--------------------------------------+
3062    /// | plan_type    | plan                                 |
3063    /// +--------------+--------------------------------------+
3064    /// | logical_plan | [                                    |
3065    /// |              |   {                                  |
3066    /// |              |     "Plan": {                        |
3067    /// |              |       "Alias": "t",                  |
3068    /// |              |       "Node Type": "Subquery",       |
3069    /// |              |       "Output": [                    |
3070    /// |              |         "x"                          |
3071    /// |              |       ],                             |
3072    /// |              |       "Plans": [                     |
3073    /// |              |         {                            |
3074    /// |              |           "Expressions": [           |
3075    /// |              |             "column1 AS x"           |
3076    /// |              |           ],                         |
3077    /// |              |           "Node Type": "Projection", |
3078    /// |              |           "Output": [                |
3079    /// |              |             "x"                      |
3080    /// |              |           ],                         |
3081    /// |              |           "Plans": [                 |
3082    /// |              |             {                        |
3083    /// |              |               "Node Type": "Values", |
3084    /// |              |               "Output": [            |
3085    /// |              |                 "column1"            |
3086    /// |              |               ],                     |
3087    /// |              |               "Plans": [],           |
3088    /// |              |               "Values": "(Int64(1))" |
3089    /// |              |             }                        |
3090    /// |              |           ]                          |
3091    /// |              |         }                            |
3092    /// |              |       ]                              |
3093    /// |              |     }                                |
3094    /// |              |   }                                  |
3095    /// |              | ]                                    |
3096    /// +--------------+--------------------------------------+
3097    /// ```
3098    PostgresJSON,
3099    /// Graphviz mode
3100    ///
3101    /// Example:
3102    /// ```text
3103    /// > explain format graphviz select x from values (1) t(x);
3104    /// +--------------+------------------------------------------------------------------------+
3105    /// | plan_type    | plan                                                                   |
3106    /// +--------------+------------------------------------------------------------------------+
3107    /// | logical_plan |                                                                        |
3108    /// |              | // Begin DataFusion GraphViz Plan,                                     |
3109    /// |              | // display it online here: https://dreampuf.github.io/GraphvizOnline   |
3110    /// |              |                                                                        |
3111    /// |              | digraph {                                                              |
3112    /// |              |   subgraph cluster_1                                                   |
3113    /// |              |   {                                                                    |
3114    /// |              |     graph[label="LogicalPlan"]                                         |
3115    /// |              |     2[shape=box label="SubqueryAlias: t"]                              |
3116    /// |              |     3[shape=box label="Projection: column1 AS x"]                      |
3117    /// |              |     2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]                |
3118    /// |              |     4[shape=box label="Values: (Int64(1))"]                            |
3119    /// |              |     3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]                |
3120    /// |              |   }                                                                    |
3121    /// |              |   subgraph cluster_5                                                   |
3122    /// |              |   {                                                                    |
3123    /// |              |     graph[label="Detailed LogicalPlan"]                                |
3124    /// |              |     6[shape=box label="SubqueryAlias: t\nSchema: [x:Int64;N]"]         |
3125    /// |              |     7[shape=box label="Projection: column1 AS x\nSchema: [x:Int64;N]"] |
3126    /// |              |     6 -> 7 [arrowhead=none, arrowtail=normal, dir=back]                |
3127    /// |              |     8[shape=box label="Values: (Int64(1))\nSchema: [column1:Int64;N]"] |
3128    /// |              |     7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]                |
3129    /// |              |   }                                                                    |
3130    /// |              | }                                                                      |
3131    /// |              | // End DataFusion GraphViz Plan                                        |
3132    /// |              |                                                                        |
3133    /// +--------------+------------------------------------------------------------------------+
3134    /// ```
3135    Graphviz,
3136}
3137
3138/// Implement  parsing strings to `ExplainFormat`
3139impl FromStr for ExplainFormat {
3140    type Err = DataFusionError;
3141
3142    fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3143        match format.to_lowercase().as_str() {
3144            "indent" => Ok(ExplainFormat::Indent),
3145            "tree" => Ok(ExplainFormat::Tree),
3146            "pgjson" => Ok(ExplainFormat::PostgresJSON),
3147            "graphviz" => Ok(ExplainFormat::Graphviz),
3148            _ => {
3149                plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3150            }
3151        }
3152    }
3153}
3154
3155/// Options for EXPLAIN
3156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3157pub struct ExplainOption {
3158    /// Include detailed debug info
3159    pub verbose: bool,
3160    /// Actually execute the plan and report metrics
3161    pub analyze: bool,
3162    /// Output syntax/format
3163    pub format: ExplainFormat,
3164}
3165
3166impl Default for ExplainOption {
3167    fn default() -> Self {
3168        ExplainOption {
3169            verbose: false,
3170            analyze: false,
3171            format: ExplainFormat::Indent,
3172        }
3173    }
3174}
3175
3176impl ExplainOption {
3177    /// Builder‐style setter for `verbose`
3178    pub fn with_verbose(mut self, verbose: bool) -> Self {
3179        self.verbose = verbose;
3180        self
3181    }
3182
3183    /// Builder‐style setter for `analyze`
3184    pub fn with_analyze(mut self, analyze: bool) -> Self {
3185        self.analyze = analyze;
3186        self
3187    }
3188
3189    /// Builder‐style setter for `format`
3190    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3191        self.format = format;
3192        self
3193    }
3194}
3195
3196/// Produces a relation with string representations of
3197/// various parts of the plan
3198///
3199/// See [the documentation] for more information
3200///
3201/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3202#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3203pub struct Explain {
3204    /// Should extra (detailed, intermediate plans) be included?
3205    pub verbose: bool,
3206    /// Output format for explain, if specified.
3207    /// If none, defaults to `text`
3208    pub explain_format: ExplainFormat,
3209    /// The logical plan that is being EXPLAIN'd
3210    pub plan: Arc<LogicalPlan>,
3211    /// Represent the various stages plans have gone through
3212    pub stringified_plans: Vec<StringifiedPlan>,
3213    /// The output schema of the explain (2 columns of text)
3214    pub schema: DFSchemaRef,
3215    /// Used by physical planner to check if should proceed with planning
3216    pub logical_optimization_succeeded: bool,
3217}
3218
3219// Manual implementation needed because of `schema` field. Comparison excludes this field.
3220impl PartialOrd for Explain {
3221    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3222        #[derive(PartialEq, PartialOrd)]
3223        struct ComparableExplain<'a> {
3224            /// Should extra (detailed, intermediate plans) be included?
3225            pub verbose: &'a bool,
3226            /// The logical plan that is being EXPLAIN'd
3227            pub plan: &'a Arc<LogicalPlan>,
3228            /// Represent the various stages plans have gone through
3229            pub stringified_plans: &'a Vec<StringifiedPlan>,
3230            /// Used by physical planner to check if should proceed with planning
3231            pub logical_optimization_succeeded: &'a bool,
3232        }
3233        let comparable_self = ComparableExplain {
3234            verbose: &self.verbose,
3235            plan: &self.plan,
3236            stringified_plans: &self.stringified_plans,
3237            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3238        };
3239        let comparable_other = ComparableExplain {
3240            verbose: &other.verbose,
3241            plan: &other.plan,
3242            stringified_plans: &other.stringified_plans,
3243            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3244        };
3245        comparable_self.partial_cmp(&comparable_other)
3246    }
3247}
3248
3249/// Runs the actual plan, and then prints the physical plan with
3250/// with execution metrics.
3251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3252pub struct Analyze {
3253    /// Should extra detail be included?
3254    pub verbose: bool,
3255    /// The logical plan that is being EXPLAIN ANALYZE'd
3256    pub input: Arc<LogicalPlan>,
3257    /// The output schema of the explain (2 columns of text)
3258    pub schema: DFSchemaRef,
3259}
3260
3261// Manual implementation needed because of `schema` field. Comparison excludes this field.
3262impl PartialOrd for Analyze {
3263    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3264        match self.verbose.partial_cmp(&other.verbose) {
3265            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3266            cmp => cmp,
3267        }
3268    }
3269}
3270
3271/// Extension operator defined outside of DataFusion
3272// TODO(clippy): This clippy `allow` should be removed if
3273// the manual `PartialEq` is removed in favor of a derive.
3274// (see `PartialEq` the impl for details.)
3275#[allow(clippy::derived_hash_with_manual_eq)]
3276#[derive(Debug, Clone, Eq, Hash)]
3277pub struct Extension {
3278    /// The runtime extension operator
3279    pub node: Arc<dyn UserDefinedLogicalNode>,
3280}
3281
3282// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3283// This manual implementation should be removed if
3284// https://github.com/rust-lang/rust/issues/39128 is fixed.
3285impl PartialEq for Extension {
3286    fn eq(&self, other: &Self) -> bool {
3287        self.node.eq(&other.node)
3288    }
3289}
3290
3291impl PartialOrd for Extension {
3292    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3293        self.node.partial_cmp(&other.node)
3294    }
3295}
3296
3297/// Produces the first `n` tuples from its input and discards the rest.
3298#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3299pub struct Limit {
3300    /// Number of rows to skip before fetch
3301    pub skip: Option<Box<Expr>>,
3302    /// Maximum number of rows to fetch,
3303    /// None means fetching all rows
3304    pub fetch: Option<Box<Expr>>,
3305    /// The logical plan
3306    pub input: Arc<LogicalPlan>,
3307}
3308
3309/// Different types of skip expression in Limit plan.
3310pub enum SkipType {
3311    /// The skip expression is a literal value.
3312    Literal(usize),
3313    /// Currently only supports expressions that can be folded into constants.
3314    UnsupportedExpr,
3315}
3316
3317/// Different types of fetch expression in Limit plan.
3318pub enum FetchType {
3319    /// The fetch expression is a literal value.
3320    /// `Literal(None)` means the fetch expression is not provided.
3321    Literal(Option<usize>),
3322    /// Currently only supports expressions that can be folded into constants.
3323    UnsupportedExpr,
3324}
3325
3326impl Limit {
3327    /// Get the skip type from the limit plan.
3328    pub fn get_skip_type(&self) -> Result<SkipType> {
3329        match self.skip.as_deref() {
3330            Some(expr) => match *expr {
3331                Expr::Literal(ScalarValue::Int64(s), _) => {
3332                    // `skip = NULL` is equivalent to `skip = 0`
3333                    let s = s.unwrap_or(0);
3334                    if s >= 0 {
3335                        Ok(SkipType::Literal(s as usize))
3336                    } else {
3337                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3338                    }
3339                }
3340                _ => Ok(SkipType::UnsupportedExpr),
3341            },
3342            // `skip = None` is equivalent to `skip = 0`
3343            None => Ok(SkipType::Literal(0)),
3344        }
3345    }
3346
3347    /// Get the fetch type from the limit plan.
3348    pub fn get_fetch_type(&self) -> Result<FetchType> {
3349        match self.fetch.as_deref() {
3350            Some(expr) => match *expr {
3351                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3352                    if s >= 0 {
3353                        Ok(FetchType::Literal(Some(s as usize)))
3354                    } else {
3355                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3356                    }
3357                }
3358                Expr::Literal(ScalarValue::Int64(None), _) => {
3359                    Ok(FetchType::Literal(None))
3360                }
3361                _ => Ok(FetchType::UnsupportedExpr),
3362            },
3363            None => Ok(FetchType::Literal(None)),
3364        }
3365    }
3366}
3367
3368/// Removes duplicate rows from the input
3369#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3370pub enum Distinct {
3371    /// Plain `DISTINCT` referencing all selection expressions
3372    All(Arc<LogicalPlan>),
3373    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3374    On(DistinctOn),
3375}
3376
3377impl Distinct {
3378    /// return a reference to the nodes input
3379    pub fn input(&self) -> &Arc<LogicalPlan> {
3380        match self {
3381            Distinct::All(input) => input,
3382            Distinct::On(DistinctOn { input, .. }) => input,
3383        }
3384    }
3385}
3386
3387/// Removes duplicate rows from the input
3388#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3389pub struct DistinctOn {
3390    /// The `DISTINCT ON` clause expression list
3391    pub on_expr: Vec<Expr>,
3392    /// The selected projection expression list
3393    pub select_expr: Vec<Expr>,
3394    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3395    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3396    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3397    pub sort_expr: Option<Vec<SortExpr>>,
3398    /// The logical plan that is being DISTINCT'd
3399    pub input: Arc<LogicalPlan>,
3400    /// The schema description of the DISTINCT ON output
3401    pub schema: DFSchemaRef,
3402}
3403
3404impl DistinctOn {
3405    /// Create a new `DistinctOn` struct.
3406    pub fn try_new(
3407        on_expr: Vec<Expr>,
3408        select_expr: Vec<Expr>,
3409        sort_expr: Option<Vec<SortExpr>>,
3410        input: Arc<LogicalPlan>,
3411    ) -> Result<Self> {
3412        if on_expr.is_empty() {
3413            return plan_err!("No `ON` expressions provided");
3414        }
3415
3416        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3417        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3418            .into_iter()
3419            .collect();
3420
3421        let dfschema = DFSchema::new_with_metadata(
3422            qualified_fields,
3423            input.schema().metadata().clone(),
3424        )?;
3425
3426        let mut distinct_on = DistinctOn {
3427            on_expr,
3428            select_expr,
3429            sort_expr: None,
3430            input,
3431            schema: Arc::new(dfschema),
3432        };
3433
3434        if let Some(sort_expr) = sort_expr {
3435            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3436        }
3437
3438        Ok(distinct_on)
3439    }
3440
3441    /// Try to update `self` with a new sort expressions.
3442    ///
3443    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3444    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3445        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3446
3447        // Check that the left-most sort expressions are the same as the `ON` expressions.
3448        let mut matched = true;
3449        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3450            if on != &sort.expr {
3451                matched = false;
3452                break;
3453            }
3454        }
3455
3456        if self.on_expr.len() > sort_expr.len() || !matched {
3457            return plan_err!(
3458                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3459            );
3460        }
3461
3462        self.sort_expr = Some(sort_expr);
3463        Ok(self)
3464    }
3465}
3466
3467// Manual implementation needed because of `schema` field. Comparison excludes this field.
3468impl PartialOrd for DistinctOn {
3469    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3470        #[derive(PartialEq, PartialOrd)]
3471        struct ComparableDistinctOn<'a> {
3472            /// The `DISTINCT ON` clause expression list
3473            pub on_expr: &'a Vec<Expr>,
3474            /// The selected projection expression list
3475            pub select_expr: &'a Vec<Expr>,
3476            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3477            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3478            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3479            pub sort_expr: &'a Option<Vec<SortExpr>>,
3480            /// The logical plan that is being DISTINCT'd
3481            pub input: &'a Arc<LogicalPlan>,
3482        }
3483        let comparable_self = ComparableDistinctOn {
3484            on_expr: &self.on_expr,
3485            select_expr: &self.select_expr,
3486            sort_expr: &self.sort_expr,
3487            input: &self.input,
3488        };
3489        let comparable_other = ComparableDistinctOn {
3490            on_expr: &other.on_expr,
3491            select_expr: &other.select_expr,
3492            sort_expr: &other.sort_expr,
3493            input: &other.input,
3494        };
3495        comparable_self.partial_cmp(&comparable_other)
3496    }
3497}
3498
3499/// Aggregates its input based on a set of grouping and aggregate
3500/// expressions (e.g. SUM).
3501///
3502/// # Output Schema
3503///
3504/// The output schema is the group expressions followed by the aggregate
3505/// expressions in order.
3506///
3507/// For example, given the input schema `"A", "B", "C"` and the aggregate
3508/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3509/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3510/// single new column
3511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3512// mark non_exhaustive to encourage use of try_new/new()
3513#[non_exhaustive]
3514pub struct Aggregate {
3515    /// The incoming logical plan
3516    pub input: Arc<LogicalPlan>,
3517    /// Grouping expressions
3518    pub group_expr: Vec<Expr>,
3519    /// Aggregate expressions
3520    pub aggr_expr: Vec<Expr>,
3521    /// The schema description of the aggregate output
3522    pub schema: DFSchemaRef,
3523}
3524
3525impl Aggregate {
3526    /// Create a new aggregate operator.
3527    pub fn try_new(
3528        input: Arc<LogicalPlan>,
3529        group_expr: Vec<Expr>,
3530        aggr_expr: Vec<Expr>,
3531    ) -> Result<Self> {
3532        let group_expr = enumerate_grouping_sets(group_expr)?;
3533
3534        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3535
3536        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3537
3538        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3539
3540        // Even columns that cannot be null will become nullable when used in a grouping set.
3541        if is_grouping_set {
3542            qualified_fields = qualified_fields
3543                .into_iter()
3544                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3545                .collect::<Vec<_>>();
3546            qualified_fields.push((
3547                None,
3548                Field::new(
3549                    Self::INTERNAL_GROUPING_ID,
3550                    Self::grouping_id_type(qualified_fields.len()),
3551                    false,
3552                )
3553                .into(),
3554            ));
3555        }
3556
3557        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3558
3559        let schema = DFSchema::new_with_metadata(
3560            qualified_fields,
3561            input.schema().metadata().clone(),
3562        )?;
3563
3564        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3565    }
3566
3567    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3568    /// building the schema again when the schema is already known.
3569    ///
3570    /// This method should only be called when you are absolutely sure that the schema being
3571    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3572    pub fn try_new_with_schema(
3573        input: Arc<LogicalPlan>,
3574        group_expr: Vec<Expr>,
3575        aggr_expr: Vec<Expr>,
3576        schema: DFSchemaRef,
3577    ) -> Result<Self> {
3578        if group_expr.is_empty() && aggr_expr.is_empty() {
3579            return plan_err!(
3580                "Aggregate requires at least one grouping or aggregate expression. \
3581                Aggregate without grouping expressions nor aggregate expressions is \
3582                logically equivalent to, but less efficient than, VALUES producing \
3583                single row. Please use VALUES instead."
3584            );
3585        }
3586        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3587        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3588            return plan_err!(
3589                "Aggregate schema has wrong number of fields. Expected {} got {}",
3590                group_expr_count + aggr_expr.len(),
3591                schema.fields().len()
3592            );
3593        }
3594
3595        let aggregate_func_dependencies =
3596            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3597        let new_schema = schema.as_ref().clone();
3598        let schema = Arc::new(
3599            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3600        );
3601        Ok(Self {
3602            input,
3603            group_expr,
3604            aggr_expr,
3605            schema,
3606        })
3607    }
3608
3609    fn is_grouping_set(&self) -> bool {
3610        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3611    }
3612
3613    /// Get the output expressions.
3614    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3615        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3616            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3617        });
3618        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3619        if self.is_grouping_set() {
3620            exprs.push(&INTERNAL_ID_EXPR);
3621        }
3622        exprs.extend(self.aggr_expr.iter());
3623        debug_assert!(exprs.len() == self.schema.fields().len());
3624        Ok(exprs)
3625    }
3626
3627    /// Get the length of the group by expression in the output schema
3628    /// This is not simply group by expression length. Expression may be
3629    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3630    pub fn group_expr_len(&self) -> Result<usize> {
3631        grouping_set_expr_count(&self.group_expr)
3632    }
3633
3634    /// Returns the data type of the grouping id.
3635    /// The grouping ID value is a bitmask where each set bit
3636    /// indicates that the corresponding grouping expression is
3637    /// null
3638    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3639        if group_exprs <= 8 {
3640            DataType::UInt8
3641        } else if group_exprs <= 16 {
3642            DataType::UInt16
3643        } else if group_exprs <= 32 {
3644            DataType::UInt32
3645        } else {
3646            DataType::UInt64
3647        }
3648    }
3649
3650    /// Internal column used when the aggregation is a grouping set.
3651    ///
3652    /// This column contains a bitmask where each bit represents a grouping
3653    /// expression. The least significant bit corresponds to the rightmost
3654    /// grouping expression. A bit value of 0 indicates that the corresponding
3655    /// column is included in the grouping set, while a value of 1 means it is excluded.
3656    ///
3657    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3658    /// column will have the following values:
3659    ///     0b00: Both `a` and `b` are included
3660    ///     0b01: `b` is excluded
3661    ///     0b10: `a` is excluded
3662    ///     0b11: Both `a` and `b` are excluded
3663    ///
3664    /// This internal column is necessary because excluded columns are replaced
3665    /// with `NULL` values. To handle these cases correctly, we must distinguish
3666    /// between an actual `NULL` value in a column and a column being excluded from the set.
3667    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3668}
3669
3670// Manual implementation needed because of `schema` field. Comparison excludes this field.
3671impl PartialOrd for Aggregate {
3672    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3673        match self.input.partial_cmp(&other.input) {
3674            Some(Ordering::Equal) => {
3675                match self.group_expr.partial_cmp(&other.group_expr) {
3676                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3677                    cmp => cmp,
3678                }
3679            }
3680            cmp => cmp,
3681        }
3682    }
3683}
3684
3685/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3686fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3687    group_expr
3688        .iter()
3689        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3690}
3691
3692/// Calculates functional dependencies for aggregate expressions.
3693fn calc_func_dependencies_for_aggregate(
3694    // Expressions in the GROUP BY clause:
3695    group_expr: &[Expr],
3696    // Input plan of the aggregate:
3697    input: &LogicalPlan,
3698    // Aggregate schema
3699    aggr_schema: &DFSchema,
3700) -> Result<FunctionalDependencies> {
3701    // We can do a case analysis on how to propagate functional dependencies based on
3702    // whether the GROUP BY in question contains a grouping set expression:
3703    // - If so, the functional dependencies will be empty because we cannot guarantee
3704    //   that GROUP BY expression results will be unique.
3705    // - Otherwise, it may be possible to propagate functional dependencies.
3706    if !contains_grouping_set(group_expr) {
3707        let group_by_expr_names = group_expr
3708            .iter()
3709            .map(|item| item.schema_name().to_string())
3710            .collect::<IndexSet<_>>()
3711            .into_iter()
3712            .collect::<Vec<_>>();
3713        let aggregate_func_dependencies = aggregate_functional_dependencies(
3714            input.schema(),
3715            &group_by_expr_names,
3716            aggr_schema,
3717        );
3718        Ok(aggregate_func_dependencies)
3719    } else {
3720        Ok(FunctionalDependencies::empty())
3721    }
3722}
3723
3724/// This function projects functional dependencies of the `input` plan according
3725/// to projection expressions `exprs`.
3726fn calc_func_dependencies_for_project(
3727    exprs: &[Expr],
3728    input: &LogicalPlan,
3729) -> Result<FunctionalDependencies> {
3730    let input_fields = input.schema().field_names();
3731    // Calculate expression indices (if present) in the input schema.
3732    let proj_indices = exprs
3733        .iter()
3734        .map(|expr| match expr {
3735            #[expect(deprecated)]
3736            Expr::Wildcard { qualifier, options } => {
3737                let wildcard_fields = exprlist_to_fields(
3738                    vec![&Expr::Wildcard {
3739                        qualifier: qualifier.clone(),
3740                        options: options.clone(),
3741                    }],
3742                    input,
3743                )?;
3744                Ok::<_, DataFusionError>(
3745                    wildcard_fields
3746                        .into_iter()
3747                        .filter_map(|(qualifier, f)| {
3748                            let flat_name = qualifier
3749                                .map(|t| format!("{}.{}", t, f.name()))
3750                                .unwrap_or_else(|| f.name().clone());
3751                            input_fields.iter().position(|item| *item == flat_name)
3752                        })
3753                        .collect::<Vec<_>>(),
3754                )
3755            }
3756            Expr::Alias(alias) => {
3757                let name = format!("{}", alias.expr);
3758                Ok(input_fields
3759                    .iter()
3760                    .position(|item| *item == name)
3761                    .map(|i| vec![i])
3762                    .unwrap_or(vec![]))
3763            }
3764            _ => {
3765                let name = format!("{expr}");
3766                Ok(input_fields
3767                    .iter()
3768                    .position(|item| *item == name)
3769                    .map(|i| vec![i])
3770                    .unwrap_or(vec![]))
3771            }
3772        })
3773        .collect::<Result<Vec<_>>>()?
3774        .into_iter()
3775        .flatten()
3776        .collect::<Vec<_>>();
3777
3778    Ok(input
3779        .schema()
3780        .functional_dependencies()
3781        .project_functional_dependencies(&proj_indices, exprs.len()))
3782}
3783
3784/// Sorts its input according to a list of sort expressions.
3785#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3786pub struct Sort {
3787    /// The sort expressions
3788    pub expr: Vec<SortExpr>,
3789    /// The incoming logical plan
3790    pub input: Arc<LogicalPlan>,
3791    /// Optional fetch limit
3792    pub fetch: Option<usize>,
3793}
3794
3795/// Join two logical plans on one or more join columns
3796#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3797pub struct Join {
3798    /// Left input
3799    pub left: Arc<LogicalPlan>,
3800    /// Right input
3801    pub right: Arc<LogicalPlan>,
3802    /// Equijoin clause expressed as pairs of (left, right) join expressions
3803    pub on: Vec<(Expr, Expr)>,
3804    /// Filters applied during join (non-equi conditions)
3805    pub filter: Option<Expr>,
3806    /// Join type
3807    pub join_type: JoinType,
3808    /// Join constraint
3809    pub join_constraint: JoinConstraint,
3810    /// The output schema, containing fields from the left and right inputs
3811    pub schema: DFSchemaRef,
3812    /// Defines the null equality for the join.
3813    pub null_equality: NullEquality,
3814}
3815
3816impl Join {
3817    /// Creates a new Join operator with automatically computed schema.
3818    ///
3819    /// This constructor computes the schema based on the join type and inputs,
3820    /// removing the need to manually specify the schema or call `recompute_schema`.
3821    ///
3822    /// # Arguments
3823    ///
3824    /// * `left` - Left input plan
3825    /// * `right` - Right input plan
3826    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3827    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3828    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3829    /// * `join_constraint` - Join constraint (On, Using)
3830    /// * `null_equality` - How to handle nulls in join comparisons
3831    ///
3832    /// # Returns
3833    ///
3834    /// A new Join operator with the computed schema
3835    pub fn try_new(
3836        left: Arc<LogicalPlan>,
3837        right: Arc<LogicalPlan>,
3838        on: Vec<(Expr, Expr)>,
3839        filter: Option<Expr>,
3840        join_type: JoinType,
3841        join_constraint: JoinConstraint,
3842        null_equality: NullEquality,
3843    ) -> Result<Self> {
3844        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3845
3846        Ok(Join {
3847            left,
3848            right,
3849            on,
3850            filter,
3851            join_type,
3852            join_constraint,
3853            schema: Arc::new(join_schema),
3854            null_equality,
3855        })
3856    }
3857
3858    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3859    /// create the physical join.
3860    pub fn try_new_with_project_input(
3861        original: &LogicalPlan,
3862        left: Arc<LogicalPlan>,
3863        right: Arc<LogicalPlan>,
3864        column_on: (Vec<Column>, Vec<Column>),
3865    ) -> Result<(Self, bool)> {
3866        let original_join = match original {
3867            LogicalPlan::Join(join) => join,
3868            _ => return plan_err!("Could not create join with project input"),
3869        };
3870
3871        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3872        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3873
3874        let mut requalified = false;
3875
3876        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3877        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3878        if original_join.join_type == JoinType::Inner
3879            || original_join.join_type == JoinType::Left
3880            || original_join.join_type == JoinType::Right
3881            || original_join.join_type == JoinType::Full
3882        {
3883            (left_sch, right_sch, requalified) =
3884                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3885        }
3886
3887        let on: Vec<(Expr, Expr)> = column_on
3888            .0
3889            .into_iter()
3890            .zip(column_on.1)
3891            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3892            .collect();
3893
3894        let join_schema = build_join_schema(
3895            left_sch.schema(),
3896            right_sch.schema(),
3897            &original_join.join_type,
3898        )?;
3899
3900        Ok((
3901            Join {
3902                left,
3903                right,
3904                on,
3905                filter: original_join.filter.clone(),
3906                join_type: original_join.join_type,
3907                join_constraint: original_join.join_constraint,
3908                schema: Arc::new(join_schema),
3909                null_equality: original_join.null_equality,
3910            },
3911            requalified,
3912        ))
3913    }
3914}
3915
3916// Manual implementation needed because of `schema` field. Comparison excludes this field.
3917impl PartialOrd for Join {
3918    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3919        #[derive(PartialEq, PartialOrd)]
3920        struct ComparableJoin<'a> {
3921            /// Left input
3922            pub left: &'a Arc<LogicalPlan>,
3923            /// Right input
3924            pub right: &'a Arc<LogicalPlan>,
3925            /// Equijoin clause expressed as pairs of (left, right) join expressions
3926            pub on: &'a Vec<(Expr, Expr)>,
3927            /// Filters applied during join (non-equi conditions)
3928            pub filter: &'a Option<Expr>,
3929            /// Join type
3930            pub join_type: &'a JoinType,
3931            /// Join constraint
3932            pub join_constraint: &'a JoinConstraint,
3933            /// The null handling behavior for equalities
3934            pub null_equality: &'a NullEquality,
3935        }
3936        let comparable_self = ComparableJoin {
3937            left: &self.left,
3938            right: &self.right,
3939            on: &self.on,
3940            filter: &self.filter,
3941            join_type: &self.join_type,
3942            join_constraint: &self.join_constraint,
3943            null_equality: &self.null_equality,
3944        };
3945        let comparable_other = ComparableJoin {
3946            left: &other.left,
3947            right: &other.right,
3948            on: &other.on,
3949            filter: &other.filter,
3950            join_type: &other.join_type,
3951            join_constraint: &other.join_constraint,
3952            null_equality: &other.null_equality,
3953        };
3954        comparable_self.partial_cmp(&comparable_other)
3955    }
3956}
3957
3958/// Subquery
3959#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3960pub struct Subquery {
3961    /// The subquery
3962    pub subquery: Arc<LogicalPlan>,
3963    /// The outer references used in the subquery
3964    pub outer_ref_columns: Vec<Expr>,
3965    /// Span information for subquery projection columns
3966    pub spans: Spans,
3967}
3968
3969impl Normalizeable for Subquery {
3970    fn can_normalize(&self) -> bool {
3971        false
3972    }
3973}
3974
3975impl NormalizeEq for Subquery {
3976    fn normalize_eq(&self, other: &Self) -> bool {
3977        // TODO: may be implement NormalizeEq for LogicalPlan?
3978        *self.subquery == *other.subquery
3979            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3980            && self
3981                .outer_ref_columns
3982                .iter()
3983                .zip(other.outer_ref_columns.iter())
3984                .all(|(a, b)| a.normalize_eq(b))
3985    }
3986}
3987
3988impl Subquery {
3989    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3990        match plan {
3991            Expr::ScalarSubquery(it) => Ok(it),
3992            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3993            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3994        }
3995    }
3996
3997    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3998        Subquery {
3999            subquery: plan,
4000            outer_ref_columns: self.outer_ref_columns.clone(),
4001            spans: Spans::new(),
4002        }
4003    }
4004}
4005
4006impl Debug for Subquery {
4007    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4008        write!(f, "<subquery>")
4009    }
4010}
4011
4012/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
4013///
4014/// See [`Partitioning`] for more details on partitioning
4015///
4016/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
4017#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4018pub enum Partitioning {
4019    /// Allocate batches using a round-robin algorithm and the specified number of partitions
4020    RoundRobinBatch(usize),
4021    /// Allocate rows based on a hash of one of more expressions and the specified number
4022    /// of partitions.
4023    Hash(Vec<Expr>, usize),
4024    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
4025    DistributeBy(Vec<Expr>),
4026}
4027
4028/// Represent the unnesting operation on a list column, such as the recursion depth and
4029/// the output column name after unnesting
4030///
4031/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
4032///
4033/// ```text
4034///   input             output_name
4035///  ┌─────────┐      ┌─────────┐
4036///  │{{1,2}}  │      │ 1       │
4037///  ├─────────┼─────►├─────────┤
4038///  │{{3}}    │      │ 2       │
4039///  ├─────────┤      ├─────────┤
4040///  │{{4},{5}}│      │ 3       │
4041///  └─────────┘      ├─────────┤
4042///                   │ 4       │
4043///                   ├─────────┤
4044///                   │ 5       │
4045///                   └─────────┘
4046/// ```
4047#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4048pub struct ColumnUnnestList {
4049    pub output_column: Column,
4050    pub depth: usize,
4051}
4052
4053impl Display for ColumnUnnestList {
4054    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4055        write!(f, "{}|depth={}", self.output_column, self.depth)
4056    }
4057}
4058
4059/// Unnest a column that contains a nested list type. See
4060/// [`UnnestOptions`] for more details.
4061#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4062pub struct Unnest {
4063    /// The incoming logical plan
4064    pub input: Arc<LogicalPlan>,
4065    /// Columns to run unnest on, can be a list of (List/Struct) columns
4066    pub exec_columns: Vec<Column>,
4067    /// refer to the indices(in the input schema) of columns
4068    /// that have type list to run unnest on
4069    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4070    /// refer to the indices (in the input schema) of columns
4071    /// that have type struct to run unnest on
4072    pub struct_type_columns: Vec<usize>,
4073    /// Having items aligned with the output columns
4074    /// representing which column in the input schema each output column depends on
4075    pub dependency_indices: Vec<usize>,
4076    /// The output schema, containing the unnested field column.
4077    pub schema: DFSchemaRef,
4078    /// Options
4079    pub options: UnnestOptions,
4080}
4081
4082// Manual implementation needed because of `schema` field. Comparison excludes this field.
4083impl PartialOrd for Unnest {
4084    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4085        #[derive(PartialEq, PartialOrd)]
4086        struct ComparableUnnest<'a> {
4087            /// The incoming logical plan
4088            pub input: &'a Arc<LogicalPlan>,
4089            /// Columns to run unnest on, can be a list of (List/Struct) columns
4090            pub exec_columns: &'a Vec<Column>,
4091            /// refer to the indices(in the input schema) of columns
4092            /// that have type list to run unnest on
4093            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4094            /// refer to the indices (in the input schema) of columns
4095            /// that have type struct to run unnest on
4096            pub struct_type_columns: &'a Vec<usize>,
4097            /// Having items aligned with the output columns
4098            /// representing which column in the input schema each output column depends on
4099            pub dependency_indices: &'a Vec<usize>,
4100            /// Options
4101            pub options: &'a UnnestOptions,
4102        }
4103        let comparable_self = ComparableUnnest {
4104            input: &self.input,
4105            exec_columns: &self.exec_columns,
4106            list_type_columns: &self.list_type_columns,
4107            struct_type_columns: &self.struct_type_columns,
4108            dependency_indices: &self.dependency_indices,
4109            options: &self.options,
4110        };
4111        let comparable_other = ComparableUnnest {
4112            input: &other.input,
4113            exec_columns: &other.exec_columns,
4114            list_type_columns: &other.list_type_columns,
4115            struct_type_columns: &other.struct_type_columns,
4116            dependency_indices: &other.dependency_indices,
4117            options: &other.options,
4118        };
4119        comparable_self.partial_cmp(&comparable_other)
4120    }
4121}
4122
4123impl Unnest {
4124    pub fn try_new(
4125        input: Arc<LogicalPlan>,
4126        exec_columns: Vec<Column>,
4127        options: UnnestOptions,
4128    ) -> Result<Self> {
4129        if exec_columns.is_empty() {
4130            return plan_err!("unnest plan requires at least 1 column to unnest");
4131        }
4132
4133        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4134        let mut struct_columns = vec![];
4135        let indices_to_unnest = exec_columns
4136            .iter()
4137            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4138            .collect::<Result<HashMap<usize, &Column>>>()?;
4139
4140        let input_schema = input.schema();
4141
4142        let mut dependency_indices = vec![];
4143        // Transform input schema into new schema
4144        // Given this comprehensive example
4145        //
4146        // input schema:
4147        // 1.col1_unnest_placeholder: list[list[int]],
4148        // 2.col1: list[list[int]]
4149        // 3.col2: list[int]
4150        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4151        // output schema:
4152        // 1.unnest_col1_depth_2: int
4153        // 2.unnest_col1_depth_1: list[int]
4154        // 3.col1: list[list[int]]
4155        // 4.unnest_col2_depth_1: int
4156        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4157        // the plural.
4158        let fields = input_schema
4159            .iter()
4160            .enumerate()
4161            .map(|(index, (original_qualifier, original_field))| {
4162                match indices_to_unnest.get(&index) {
4163                    Some(column_to_unnest) => {
4164                        let recursions_on_column = options
4165                            .recursions
4166                            .iter()
4167                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4168                            .collect::<Vec<_>>();
4169                        let mut transformed_columns = recursions_on_column
4170                            .iter()
4171                            .map(|r| {
4172                                list_columns.push((
4173                                    index,
4174                                    ColumnUnnestList {
4175                                        output_column: r.output_column.clone(),
4176                                        depth: r.depth,
4177                                    },
4178                                ));
4179                                Ok(get_unnested_columns(
4180                                    &r.output_column.name,
4181                                    original_field.data_type(),
4182                                    r.depth,
4183                                )?
4184                                .into_iter()
4185                                .next()
4186                                .unwrap()) // because unnesting a list column always result into one result
4187                            })
4188                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4189                        if transformed_columns.is_empty() {
4190                            transformed_columns = get_unnested_columns(
4191                                &column_to_unnest.name,
4192                                original_field.data_type(),
4193                                1,
4194                            )?;
4195                            match original_field.data_type() {
4196                                DataType::Struct(_) => {
4197                                    struct_columns.push(index);
4198                                }
4199                                DataType::List(_)
4200                                | DataType::FixedSizeList(_, _)
4201                                | DataType::LargeList(_) => {
4202                                    list_columns.push((
4203                                        index,
4204                                        ColumnUnnestList {
4205                                            output_column: Column::from_name(
4206                                                &column_to_unnest.name,
4207                                            ),
4208                                            depth: 1,
4209                                        },
4210                                    ));
4211                                }
4212                                _ => {}
4213                            };
4214                        }
4215
4216                        // new columns dependent on the same original index
4217                        dependency_indices.extend(std::iter::repeat_n(
4218                            index,
4219                            transformed_columns.len(),
4220                        ));
4221                        Ok(transformed_columns
4222                            .iter()
4223                            .map(|(col, field)| {
4224                                (col.relation.to_owned(), field.to_owned())
4225                            })
4226                            .collect())
4227                    }
4228                    None => {
4229                        dependency_indices.push(index);
4230                        Ok(vec![(
4231                            original_qualifier.cloned(),
4232                            Arc::clone(original_field),
4233                        )])
4234                    }
4235                }
4236            })
4237            .collect::<Result<Vec<_>>>()?
4238            .into_iter()
4239            .flatten()
4240            .collect::<Vec<_>>();
4241
4242        let metadata = input_schema.metadata().clone();
4243        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4244        // We can use the existing functional dependencies:
4245        let deps = input_schema.functional_dependencies().clone();
4246        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4247
4248        Ok(Unnest {
4249            input,
4250            exec_columns,
4251            list_type_columns: list_columns,
4252            struct_type_columns: struct_columns,
4253            dependency_indices,
4254            schema,
4255            options,
4256        })
4257    }
4258}
4259
4260// Based on data type, either struct or a variant of list
4261// return a set of columns as the result of unnesting
4262// the input columns.
4263// For example, given a column with name "a",
4264// - List(Element) returns ["a"] with data type Element
4265// - Struct(field1, field2) returns ["a.field1","a.field2"]
4266// For list data type, an argument depth is used to specify
4267// the recursion level
4268fn get_unnested_columns(
4269    col_name: &String,
4270    data_type: &DataType,
4271    depth: usize,
4272) -> Result<Vec<(Column, Arc<Field>)>> {
4273    let mut qualified_columns = Vec::with_capacity(1);
4274
4275    match data_type {
4276        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4277            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4278            let new_field = Arc::new(Field::new(
4279                col_name, data_type,
4280                // Unnesting may produce NULLs even if the list is not null.
4281                // For example: unnest([1], []) -> 1, null
4282                true,
4283            ));
4284            let column = Column::from_name(col_name);
4285            // let column = Column::from((None, &new_field));
4286            qualified_columns.push((column, new_field));
4287        }
4288        DataType::Struct(fields) => {
4289            qualified_columns.extend(fields.iter().map(|f| {
4290                let new_name = format!("{}.{}", col_name, f.name());
4291                let column = Column::from_name(&new_name);
4292                let new_field = f.as_ref().clone().with_name(new_name);
4293                // let column = Column::from((None, &f));
4294                (column, Arc::new(new_field))
4295            }))
4296        }
4297        _ => {
4298            return internal_err!(
4299                "trying to unnest on invalid data type {:?}",
4300                data_type
4301            );
4302        }
4303    };
4304    Ok(qualified_columns)
4305}
4306
4307// Get the data type of a multi-dimensional type after unnesting it
4308// with a given depth
4309fn get_unnested_list_datatype_recursive(
4310    data_type: &DataType,
4311    depth: usize,
4312) -> Result<DataType> {
4313    match data_type {
4314        DataType::List(field)
4315        | DataType::FixedSizeList(field, _)
4316        | DataType::LargeList(field) => {
4317            if depth == 1 {
4318                return Ok(field.data_type().clone());
4319            }
4320            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4321        }
4322        _ => {}
4323    };
4324
4325    internal_err!("trying to unnest on invalid data type {:?}", data_type)
4326}
4327
4328#[cfg(test)]
4329mod tests {
4330    use super::*;
4331    use crate::builder::LogicalTableSource;
4332    use crate::logical_plan::table_scan;
4333    use crate::test::function_stub::{count, count_udaf};
4334    use crate::{
4335        binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4336        GroupingSet,
4337    };
4338    use datafusion_common::tree_node::{
4339        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4340    };
4341    use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4342    use insta::{assert_debug_snapshot, assert_snapshot};
4343    use std::hash::DefaultHasher;
4344
4345    fn employee_schema() -> Schema {
4346        Schema::new(vec![
4347            Field::new("id", DataType::Int32, false),
4348            Field::new("first_name", DataType::Utf8, false),
4349            Field::new("last_name", DataType::Utf8, false),
4350            Field::new("state", DataType::Utf8, false),
4351            Field::new("salary", DataType::Int32, false),
4352        ])
4353    }
4354
4355    fn display_plan() -> Result<LogicalPlan> {
4356        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4357            .build()?;
4358
4359        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4360            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4361            .project(vec![col("id")])?
4362            .build()
4363    }
4364
4365    #[test]
4366    fn test_display_indent() -> Result<()> {
4367        let plan = display_plan()?;
4368
4369        assert_snapshot!(plan.display_indent(), @r"
4370        Projection: employee_csv.id
4371          Filter: employee_csv.state IN (<subquery>)
4372            Subquery:
4373              TableScan: employee_csv projection=[state]
4374            TableScan: employee_csv projection=[id, state]
4375        ");
4376        Ok(())
4377    }
4378
4379    #[test]
4380    fn test_display_indent_schema() -> Result<()> {
4381        let plan = display_plan()?;
4382
4383        assert_snapshot!(plan.display_indent_schema(), @r"
4384        Projection: employee_csv.id [id:Int32]
4385          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4386            Subquery: [state:Utf8]
4387              TableScan: employee_csv projection=[state] [state:Utf8]
4388            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4389        ");
4390        Ok(())
4391    }
4392
4393    #[test]
4394    fn test_display_subquery_alias() -> Result<()> {
4395        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4396            .build()?;
4397        let plan1 = Arc::new(plan1);
4398
4399        let plan =
4400            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4401                .project(vec![col("id"), exists(plan1).alias("exists")])?
4402                .build();
4403
4404        assert_snapshot!(plan?.display_indent(), @r"
4405        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4406          Subquery:
4407            TableScan: employee_csv projection=[state]
4408          TableScan: employee_csv projection=[id, state]
4409        ");
4410        Ok(())
4411    }
4412
4413    #[test]
4414    fn test_display_graphviz() -> Result<()> {
4415        let plan = display_plan()?;
4416
4417        // just test for a few key lines in the output rather than the
4418        // whole thing to make test maintenance easier.
4419        assert_snapshot!(plan.display_graphviz(), @r#"
4420        // Begin DataFusion GraphViz Plan,
4421        // display it online here: https://dreampuf.github.io/GraphvizOnline
4422
4423        digraph {
4424          subgraph cluster_1
4425          {
4426            graph[label="LogicalPlan"]
4427            2[shape=box label="Projection: employee_csv.id"]
4428            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4429            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4430            4[shape=box label="Subquery:"]
4431            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4432            5[shape=box label="TableScan: employee_csv projection=[state]"]
4433            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4434            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4435            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4436          }
4437          subgraph cluster_7
4438          {
4439            graph[label="Detailed LogicalPlan"]
4440            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4441            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4442            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4443            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4444            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4445            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4446            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4447            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4448            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4449          }
4450        }
4451        // End DataFusion GraphViz Plan
4452        "#);
4453        Ok(())
4454    }
4455
4456    #[test]
4457    fn test_display_pg_json() -> Result<()> {
4458        let plan = display_plan()?;
4459
4460        assert_snapshot!(plan.display_pg_json(), @r#"
4461        [
4462          {
4463            "Plan": {
4464              "Expressions": [
4465                "employee_csv.id"
4466              ],
4467              "Node Type": "Projection",
4468              "Output": [
4469                "id"
4470              ],
4471              "Plans": [
4472                {
4473                  "Condition": "employee_csv.state IN (<subquery>)",
4474                  "Node Type": "Filter",
4475                  "Output": [
4476                    "id",
4477                    "state"
4478                  ],
4479                  "Plans": [
4480                    {
4481                      "Node Type": "Subquery",
4482                      "Output": [
4483                        "state"
4484                      ],
4485                      "Plans": [
4486                        {
4487                          "Node Type": "TableScan",
4488                          "Output": [
4489                            "state"
4490                          ],
4491                          "Plans": [],
4492                          "Relation Name": "employee_csv"
4493                        }
4494                      ]
4495                    },
4496                    {
4497                      "Node Type": "TableScan",
4498                      "Output": [
4499                        "id",
4500                        "state"
4501                      ],
4502                      "Plans": [],
4503                      "Relation Name": "employee_csv"
4504                    }
4505                  ]
4506                }
4507              ]
4508            }
4509          }
4510        ]
4511        "#);
4512        Ok(())
4513    }
4514
4515    /// Tests for the Visitor trait and walking logical plan nodes
4516    #[derive(Debug, Default)]
4517    struct OkVisitor {
4518        strings: Vec<String>,
4519    }
4520
4521    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4522        type Node = LogicalPlan;
4523
4524        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4525            let s = match plan {
4526                LogicalPlan::Projection { .. } => "pre_visit Projection",
4527                LogicalPlan::Filter { .. } => "pre_visit Filter",
4528                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4529                _ => {
4530                    return not_impl_err!("unknown plan type");
4531                }
4532            };
4533
4534            self.strings.push(s.into());
4535            Ok(TreeNodeRecursion::Continue)
4536        }
4537
4538        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4539            let s = match plan {
4540                LogicalPlan::Projection { .. } => "post_visit Projection",
4541                LogicalPlan::Filter { .. } => "post_visit Filter",
4542                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4543                _ => {
4544                    return not_impl_err!("unknown plan type");
4545                }
4546            };
4547
4548            self.strings.push(s.into());
4549            Ok(TreeNodeRecursion::Continue)
4550        }
4551    }
4552
4553    #[test]
4554    fn visit_order() {
4555        let mut visitor = OkVisitor::default();
4556        let plan = test_plan();
4557        let res = plan.visit_with_subqueries(&mut visitor);
4558        assert!(res.is_ok());
4559
4560        assert_debug_snapshot!(visitor.strings, @r#"
4561        [
4562            "pre_visit Projection",
4563            "pre_visit Filter",
4564            "pre_visit TableScan",
4565            "post_visit TableScan",
4566            "post_visit Filter",
4567            "post_visit Projection",
4568        ]
4569        "#);
4570    }
4571
4572    #[derive(Debug, Default)]
4573    /// Counter than counts to zero and returns true when it gets there
4574    struct OptionalCounter {
4575        val: Option<usize>,
4576    }
4577
4578    impl OptionalCounter {
4579        fn new(val: usize) -> Self {
4580            Self { val: Some(val) }
4581        }
4582        // Decrements the counter by 1, if any, returning true if it hits zero
4583        fn dec(&mut self) -> bool {
4584            if Some(0) == self.val {
4585                true
4586            } else {
4587                self.val = self.val.take().map(|i| i - 1);
4588                false
4589            }
4590        }
4591    }
4592
4593    #[derive(Debug, Default)]
4594    /// Visitor that returns false after some number of visits
4595    struct StoppingVisitor {
4596        inner: OkVisitor,
4597        /// When Some(0) returns false from pre_visit
4598        return_false_from_pre_in: OptionalCounter,
4599        /// When Some(0) returns false from post_visit
4600        return_false_from_post_in: OptionalCounter,
4601    }
4602
4603    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4604        type Node = LogicalPlan;
4605
4606        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4607            if self.return_false_from_pre_in.dec() {
4608                return Ok(TreeNodeRecursion::Stop);
4609            }
4610            self.inner.f_down(plan)?;
4611
4612            Ok(TreeNodeRecursion::Continue)
4613        }
4614
4615        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4616            if self.return_false_from_post_in.dec() {
4617                return Ok(TreeNodeRecursion::Stop);
4618            }
4619
4620            self.inner.f_up(plan)
4621        }
4622    }
4623
4624    /// test early stopping in pre-visit
4625    #[test]
4626    fn early_stopping_pre_visit() {
4627        let mut visitor = StoppingVisitor {
4628            return_false_from_pre_in: OptionalCounter::new(2),
4629            ..Default::default()
4630        };
4631        let plan = test_plan();
4632        let res = plan.visit_with_subqueries(&mut visitor);
4633        assert!(res.is_ok());
4634
4635        assert_debug_snapshot!(
4636            visitor.inner.strings,
4637            @r#"
4638        [
4639            "pre_visit Projection",
4640            "pre_visit Filter",
4641        ]
4642        "#
4643        );
4644    }
4645
4646    #[test]
4647    fn early_stopping_post_visit() {
4648        let mut visitor = StoppingVisitor {
4649            return_false_from_post_in: OptionalCounter::new(1),
4650            ..Default::default()
4651        };
4652        let plan = test_plan();
4653        let res = plan.visit_with_subqueries(&mut visitor);
4654        assert!(res.is_ok());
4655
4656        assert_debug_snapshot!(
4657            visitor.inner.strings,
4658            @r#"
4659        [
4660            "pre_visit Projection",
4661            "pre_visit Filter",
4662            "pre_visit TableScan",
4663            "post_visit TableScan",
4664        ]
4665        "#
4666        );
4667    }
4668
4669    #[derive(Debug, Default)]
4670    /// Visitor that returns an error after some number of visits
4671    struct ErrorVisitor {
4672        inner: OkVisitor,
4673        /// When Some(0) returns false from pre_visit
4674        return_error_from_pre_in: OptionalCounter,
4675        /// When Some(0) returns false from post_visit
4676        return_error_from_post_in: OptionalCounter,
4677    }
4678
4679    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4680        type Node = LogicalPlan;
4681
4682        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4683            if self.return_error_from_pre_in.dec() {
4684                return not_impl_err!("Error in pre_visit");
4685            }
4686
4687            self.inner.f_down(plan)
4688        }
4689
4690        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4691            if self.return_error_from_post_in.dec() {
4692                return not_impl_err!("Error in post_visit");
4693            }
4694
4695            self.inner.f_up(plan)
4696        }
4697    }
4698
4699    #[test]
4700    fn error_pre_visit() {
4701        let mut visitor = ErrorVisitor {
4702            return_error_from_pre_in: OptionalCounter::new(2),
4703            ..Default::default()
4704        };
4705        let plan = test_plan();
4706        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4707        assert_snapshot!(
4708            res.strip_backtrace(),
4709            @"This feature is not implemented: Error in pre_visit"
4710        );
4711        assert_debug_snapshot!(
4712            visitor.inner.strings,
4713            @r#"
4714        [
4715            "pre_visit Projection",
4716            "pre_visit Filter",
4717        ]
4718        "#
4719        );
4720    }
4721
4722    #[test]
4723    fn error_post_visit() {
4724        let mut visitor = ErrorVisitor {
4725            return_error_from_post_in: OptionalCounter::new(1),
4726            ..Default::default()
4727        };
4728        let plan = test_plan();
4729        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4730        assert_snapshot!(
4731            res.strip_backtrace(),
4732            @"This feature is not implemented: Error in post_visit"
4733        );
4734        assert_debug_snapshot!(
4735            visitor.inner.strings,
4736            @r#"
4737        [
4738            "pre_visit Projection",
4739            "pre_visit Filter",
4740            "pre_visit TableScan",
4741            "post_visit TableScan",
4742        ]
4743        "#
4744        );
4745    }
4746
4747    #[test]
4748    fn test_partial_eq_hash_and_partial_ord() {
4749        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4750            produce_one_row: true,
4751            schema: Arc::new(DFSchema::empty()),
4752        }));
4753
4754        let count_window_function = |schema| {
4755            Window::try_new_with_schema(
4756                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4757                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4758                    vec![],
4759                )))],
4760                Arc::clone(&empty_values),
4761                Arc::new(schema),
4762            )
4763            .unwrap()
4764        };
4765
4766        let schema_without_metadata = || {
4767            DFSchema::from_unqualified_fields(
4768                vec![Field::new("count", DataType::Int64, false)].into(),
4769                HashMap::new(),
4770            )
4771            .unwrap()
4772        };
4773
4774        let schema_with_metadata = || {
4775            DFSchema::from_unqualified_fields(
4776                vec![Field::new("count", DataType::Int64, false)].into(),
4777                [("key".to_string(), "value".to_string())].into(),
4778            )
4779            .unwrap()
4780        };
4781
4782        // A Window
4783        let f = count_window_function(schema_without_metadata());
4784
4785        // Same like `f`, different instance
4786        let f2 = count_window_function(schema_without_metadata());
4787        assert_eq!(f, f2);
4788        assert_eq!(hash(&f), hash(&f2));
4789        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4790
4791        // Same like `f`, except for schema metadata
4792        let o = count_window_function(schema_with_metadata());
4793        assert_ne!(f, o);
4794        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4795        assert_eq!(f.partial_cmp(&o), None);
4796    }
4797
4798    fn hash<T: Hash>(value: &T) -> u64 {
4799        let hasher = &mut DefaultHasher::new();
4800        value.hash(hasher);
4801        hasher.finish()
4802    }
4803
4804    #[test]
4805    fn projection_expr_schema_mismatch() -> Result<()> {
4806        let empty_schema = Arc::new(DFSchema::empty());
4807        let p = Projection::try_new_with_schema(
4808            vec![col("a")],
4809            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4810                produce_one_row: false,
4811                schema: Arc::clone(&empty_schema),
4812            })),
4813            empty_schema,
4814        );
4815        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4816        Ok(())
4817    }
4818
4819    fn test_plan() -> LogicalPlan {
4820        let schema = Schema::new(vec![
4821            Field::new("id", DataType::Int32, false),
4822            Field::new("state", DataType::Utf8, false),
4823        ]);
4824
4825        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4826            .unwrap()
4827            .filter(col("state").eq(lit("CO")))
4828            .unwrap()
4829            .project(vec![col("id")])
4830            .unwrap()
4831            .build()
4832            .unwrap()
4833    }
4834
4835    #[test]
4836    fn test_replace_invalid_placeholder() {
4837        // test empty placeholder
4838        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4839
4840        let plan = table_scan(TableReference::none(), &schema, None)
4841            .unwrap()
4842            .filter(col("id").eq(placeholder("")))
4843            .unwrap()
4844            .build()
4845            .unwrap();
4846
4847        let param_values = vec![ScalarValue::Int32(Some(42))];
4848        plan.replace_params_with_values(&param_values.clone().into())
4849            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4850
4851        // test $0 placeholder
4852        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4853
4854        let plan = table_scan(TableReference::none(), &schema, None)
4855            .unwrap()
4856            .filter(col("id").eq(placeholder("$0")))
4857            .unwrap()
4858            .build()
4859            .unwrap();
4860
4861        plan.replace_params_with_values(&param_values.clone().into())
4862            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4863
4864        // test $00 placeholder
4865        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4866
4867        let plan = table_scan(TableReference::none(), &schema, None)
4868            .unwrap()
4869            .filter(col("id").eq(placeholder("$00")))
4870            .unwrap()
4871            .build()
4872            .unwrap();
4873
4874        plan.replace_params_with_values(&param_values.into())
4875            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4876    }
4877
4878    #[test]
4879    fn test_nullable_schema_after_grouping_set() {
4880        let schema = Schema::new(vec![
4881            Field::new("foo", DataType::Int32, false),
4882            Field::new("bar", DataType::Int32, false),
4883        ]);
4884
4885        let plan = table_scan(TableReference::none(), &schema, None)
4886            .unwrap()
4887            .aggregate(
4888                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4889                    vec![col("foo")],
4890                    vec![col("bar")],
4891                ]))],
4892                vec![count(lit(true))],
4893            )
4894            .unwrap()
4895            .build()
4896            .unwrap();
4897
4898        let output_schema = plan.schema();
4899
4900        assert!(output_schema
4901            .field_with_name(None, "foo")
4902            .unwrap()
4903            .is_nullable(),);
4904        assert!(output_schema
4905            .field_with_name(None, "bar")
4906            .unwrap()
4907            .is_nullable());
4908    }
4909
4910    #[test]
4911    fn test_filter_is_scalar() {
4912        // test empty placeholder
4913        let schema =
4914            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4915
4916        let source = Arc::new(LogicalTableSource::new(schema));
4917        let schema = Arc::new(
4918            DFSchema::try_from_qualified_schema(
4919                TableReference::bare("tab"),
4920                &source.schema(),
4921            )
4922            .unwrap(),
4923        );
4924        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4925            table_name: TableReference::bare("tab"),
4926            source: Arc::clone(&source) as Arc<dyn TableSource>,
4927            projection: None,
4928            projected_schema: Arc::clone(&schema),
4929            filters: vec![],
4930            fetch: None,
4931        }));
4932        let col = schema.field_names()[0].clone();
4933
4934        let filter = Filter::try_new(
4935            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4936            scan,
4937        )
4938        .unwrap();
4939        assert!(!filter.is_scalar());
4940        let unique_schema = Arc::new(
4941            schema
4942                .as_ref()
4943                .clone()
4944                .with_functional_dependencies(
4945                    FunctionalDependencies::new_from_constraints(
4946                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4947                            vec![0],
4948                        )])),
4949                        1,
4950                    ),
4951                )
4952                .unwrap(),
4953        );
4954        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4955            table_name: TableReference::bare("tab"),
4956            source,
4957            projection: None,
4958            projected_schema: Arc::clone(&unique_schema),
4959            filters: vec![],
4960            fetch: None,
4961        }));
4962        let col = schema.field_names()[0].clone();
4963
4964        let filter =
4965            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4966        assert!(filter.is_scalar());
4967    }
4968
4969    #[test]
4970    fn test_transform_explain() {
4971        let schema = Schema::new(vec![
4972            Field::new("foo", DataType::Int32, false),
4973            Field::new("bar", DataType::Int32, false),
4974        ]);
4975
4976        let plan = table_scan(TableReference::none(), &schema, None)
4977            .unwrap()
4978            .explain(false, false)
4979            .unwrap()
4980            .build()
4981            .unwrap();
4982
4983        let external_filter = col("foo").eq(lit(true));
4984
4985        // after transformation, because plan is not the same anymore,
4986        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
4987        let plan = plan
4988            .transform(|plan| match plan {
4989                LogicalPlan::TableScan(table) => {
4990                    let filter = Filter::try_new(
4991                        external_filter.clone(),
4992                        Arc::new(LogicalPlan::TableScan(table)),
4993                    )
4994                    .unwrap();
4995                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4996                }
4997                x => Ok(Transformed::no(x)),
4998            })
4999            .data()
5000            .unwrap();
5001
5002        let actual = format!("{}", plan.display_indent());
5003        assert_snapshot!(actual, @r"
5004        Explain
5005          Filter: foo = Boolean(true)
5006            TableScan: ?table?
5007        ")
5008    }
5009
5010    #[test]
5011    fn test_plan_partial_ord() {
5012        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5013            produce_one_row: false,
5014            schema: Arc::new(DFSchema::empty()),
5015        });
5016
5017        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5018            schema: Arc::new(Schema::new(vec![Field::new(
5019                "foo",
5020                DataType::Int32,
5021                false,
5022            )])),
5023            output_schema: DFSchemaRef::new(DFSchema::empty()),
5024        });
5025
5026        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5027            schema: Arc::new(Schema::new(vec![Field::new(
5028                "foo",
5029                DataType::Int32,
5030                false,
5031            )])),
5032            output_schema: DFSchemaRef::new(DFSchema::empty()),
5033        });
5034
5035        assert_eq!(
5036            empty_relation.partial_cmp(&describe_table),
5037            Some(Ordering::Less)
5038        );
5039        assert_eq!(
5040            describe_table.partial_cmp(&empty_relation),
5041            Some(Ordering::Greater)
5042        );
5043        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5044    }
5045
5046    #[test]
5047    fn test_limit_with_new_children() {
5048        let input = Arc::new(LogicalPlan::Values(Values {
5049            schema: Arc::new(DFSchema::empty()),
5050            values: vec![vec![]],
5051        }));
5052        let cases = [
5053            LogicalPlan::Limit(Limit {
5054                skip: None,
5055                fetch: None,
5056                input: Arc::clone(&input),
5057            }),
5058            LogicalPlan::Limit(Limit {
5059                skip: None,
5060                fetch: Some(Box::new(Expr::Literal(
5061                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5062                    None,
5063                ))),
5064                input: Arc::clone(&input),
5065            }),
5066            LogicalPlan::Limit(Limit {
5067                skip: Some(Box::new(Expr::Literal(
5068                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5069                    None,
5070                ))),
5071                fetch: None,
5072                input: Arc::clone(&input),
5073            }),
5074            LogicalPlan::Limit(Limit {
5075                skip: Some(Box::new(Expr::Literal(
5076                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5077                    None,
5078                ))),
5079                fetch: Some(Box::new(Expr::Literal(
5080                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5081                    None,
5082                ))),
5083                input,
5084            }),
5085        ];
5086
5087        for limit in cases {
5088            let new_limit = limit
5089                .with_new_exprs(
5090                    limit.expressions(),
5091                    limit.inputs().into_iter().cloned().collect(),
5092                )
5093                .unwrap();
5094            assert_eq!(limit, new_limit);
5095        }
5096    }
5097
5098    #[test]
5099    fn test_with_subqueries_jump() {
5100        // The test plan contains a `Project` node above a `Filter` node, and the
5101        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5102        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5103        // `Filter`s.
5104        let subquery_schema =
5105            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5106
5107        let subquery_plan =
5108            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5109                .unwrap()
5110                .filter(col("sub_id").eq(lit(0)))
5111                .unwrap()
5112                .build()
5113                .unwrap();
5114
5115        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5116
5117        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5118            .unwrap()
5119            .filter(col("id").eq(lit(0)))
5120            .unwrap()
5121            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5122            .unwrap()
5123            .build()
5124            .unwrap();
5125
5126        let mut filter_found = false;
5127        plan.apply_with_subqueries(|plan| {
5128            match plan {
5129                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5130                LogicalPlan::Filter(..) => filter_found = true,
5131                _ => {}
5132            }
5133            Ok(TreeNodeRecursion::Continue)
5134        })
5135        .unwrap();
5136        assert!(!filter_found);
5137
5138        struct ProjectJumpVisitor {
5139            filter_found: bool,
5140        }
5141
5142        impl ProjectJumpVisitor {
5143            fn new() -> Self {
5144                Self {
5145                    filter_found: false,
5146                }
5147            }
5148        }
5149
5150        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5151            type Node = LogicalPlan;
5152
5153            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5154                match node {
5155                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5156                    LogicalPlan::Filter(..) => self.filter_found = true,
5157                    _ => {}
5158                }
5159                Ok(TreeNodeRecursion::Continue)
5160            }
5161        }
5162
5163        let mut visitor = ProjectJumpVisitor::new();
5164        plan.visit_with_subqueries(&mut visitor).unwrap();
5165        assert!(!visitor.filter_found);
5166
5167        let mut filter_found = false;
5168        plan.clone()
5169            .transform_down_with_subqueries(|plan| {
5170                match plan {
5171                    LogicalPlan::Projection(..) => {
5172                        return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5173                    }
5174                    LogicalPlan::Filter(..) => filter_found = true,
5175                    _ => {}
5176                }
5177                Ok(Transformed::no(plan))
5178            })
5179            .unwrap();
5180        assert!(!filter_found);
5181
5182        let mut filter_found = false;
5183        plan.clone()
5184            .transform_down_up_with_subqueries(
5185                |plan| {
5186                    match plan {
5187                        LogicalPlan::Projection(..) => {
5188                            return Ok(Transformed::new(
5189                                plan,
5190                                false,
5191                                TreeNodeRecursion::Jump,
5192                            ))
5193                        }
5194                        LogicalPlan::Filter(..) => filter_found = true,
5195                        _ => {}
5196                    }
5197                    Ok(Transformed::no(plan))
5198                },
5199                |plan| Ok(Transformed::no(plan)),
5200            )
5201            .unwrap();
5202        assert!(!filter_found);
5203
5204        struct ProjectJumpRewriter {
5205            filter_found: bool,
5206        }
5207
5208        impl ProjectJumpRewriter {
5209            fn new() -> Self {
5210                Self {
5211                    filter_found: false,
5212                }
5213            }
5214        }
5215
5216        impl TreeNodeRewriter for ProjectJumpRewriter {
5217            type Node = LogicalPlan;
5218
5219            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5220                match node {
5221                    LogicalPlan::Projection(..) => {
5222                        return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5223                    }
5224                    LogicalPlan::Filter(..) => self.filter_found = true,
5225                    _ => {}
5226                }
5227                Ok(Transformed::no(node))
5228            }
5229        }
5230
5231        let mut rewriter = ProjectJumpRewriter::new();
5232        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5233        assert!(!rewriter.filter_found);
5234    }
5235
5236    #[test]
5237    fn test_with_unresolved_placeholders() {
5238        let field_name = "id";
5239        let placeholder_value = "$1";
5240        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5241
5242        let plan = table_scan(TableReference::none(), &schema, None)
5243            .unwrap()
5244            .filter(col(field_name).eq(placeholder(placeholder_value)))
5245            .unwrap()
5246            .build()
5247            .unwrap();
5248
5249        // Check that the placeholder parameters have not received a DataType.
5250        let params = plan.get_parameter_types().unwrap();
5251        assert_eq!(params.len(), 1);
5252
5253        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5254        assert_eq!(parameter_type, None);
5255    }
5256
5257    #[test]
5258    fn test_join_with_new_exprs() -> Result<()> {
5259        fn create_test_join(
5260            on: Vec<(Expr, Expr)>,
5261            filter: Option<Expr>,
5262        ) -> Result<LogicalPlan> {
5263            let schema = Schema::new(vec![
5264                Field::new("a", DataType::Int32, false),
5265                Field::new("b", DataType::Int32, false),
5266            ]);
5267
5268            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5269            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5270
5271            Ok(LogicalPlan::Join(Join {
5272                left: Arc::new(
5273                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5274                ),
5275                right: Arc::new(
5276                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5277                ),
5278                on,
5279                filter,
5280                join_type: JoinType::Inner,
5281                join_constraint: JoinConstraint::On,
5282                schema: Arc::new(left_schema.join(&right_schema)?),
5283                null_equality: NullEquality::NullEqualsNothing,
5284            }))
5285        }
5286
5287        {
5288            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5289            let LogicalPlan::Join(join) = join.with_new_exprs(
5290                join.expressions(),
5291                join.inputs().into_iter().cloned().collect(),
5292            )?
5293            else {
5294                unreachable!()
5295            };
5296            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5297            assert_eq!(join.filter, None);
5298        }
5299
5300        {
5301            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5302            let LogicalPlan::Join(join) = join.with_new_exprs(
5303                join.expressions(),
5304                join.inputs().into_iter().cloned().collect(),
5305            )?
5306            else {
5307                unreachable!()
5308            };
5309            assert_eq!(join.on, vec![]);
5310            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5311        }
5312
5313        {
5314            let join = create_test_join(
5315                vec![(col("t1.a"), (col("t2.a")))],
5316                Some(col("t1.b").gt(col("t2.b"))),
5317            )?;
5318            let LogicalPlan::Join(join) = join.with_new_exprs(
5319                join.expressions(),
5320                join.inputs().into_iter().cloned().collect(),
5321            )?
5322            else {
5323                unreachable!()
5324            };
5325            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5326            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5327        }
5328
5329        {
5330            let join = create_test_join(
5331                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5332                None,
5333            )?;
5334            let LogicalPlan::Join(join) = join.with_new_exprs(
5335                vec![
5336                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5337                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5338                    col("t1.b"),
5339                    col("t2.b"),
5340                    lit(true),
5341                ],
5342                join.inputs().into_iter().cloned().collect(),
5343            )?
5344            else {
5345                unreachable!()
5346            };
5347            assert_eq!(
5348                join.on,
5349                vec![
5350                    (
5351                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5352                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5353                    ),
5354                    (col("t1.b"), (col("t2.b")))
5355                ]
5356            );
5357            assert_eq!(join.filter, Some(lit(true)));
5358        }
5359
5360        Ok(())
5361    }
5362
5363    #[test]
5364    fn test_join_try_new() -> Result<()> {
5365        let schema = Schema::new(vec![
5366            Field::new("a", DataType::Int32, false),
5367            Field::new("b", DataType::Int32, false),
5368        ]);
5369
5370        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5371
5372        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5373
5374        let join_types = vec![
5375            JoinType::Inner,
5376            JoinType::Left,
5377            JoinType::Right,
5378            JoinType::Full,
5379            JoinType::LeftSemi,
5380            JoinType::LeftAnti,
5381            JoinType::RightSemi,
5382            JoinType::RightAnti,
5383            JoinType::LeftMark,
5384        ];
5385
5386        for join_type in join_types {
5387            let join = Join::try_new(
5388                Arc::new(left_scan.clone()),
5389                Arc::new(right_scan.clone()),
5390                vec![(col("t1.a"), col("t2.a"))],
5391                Some(col("t1.b").gt(col("t2.b"))),
5392                join_type,
5393                JoinConstraint::On,
5394                NullEquality::NullEqualsNothing,
5395            )?;
5396
5397            match join_type {
5398                JoinType::LeftSemi | JoinType::LeftAnti => {
5399                    assert_eq!(join.schema.fields().len(), 2);
5400
5401                    let fields = join.schema.fields();
5402                    assert_eq!(
5403                        fields[0].name(),
5404                        "a",
5405                        "First field should be 'a' from left table"
5406                    );
5407                    assert_eq!(
5408                        fields[1].name(),
5409                        "b",
5410                        "Second field should be 'b' from left table"
5411                    );
5412                }
5413                JoinType::RightSemi | JoinType::RightAnti => {
5414                    assert_eq!(join.schema.fields().len(), 2);
5415
5416                    let fields = join.schema.fields();
5417                    assert_eq!(
5418                        fields[0].name(),
5419                        "a",
5420                        "First field should be 'a' from right table"
5421                    );
5422                    assert_eq!(
5423                        fields[1].name(),
5424                        "b",
5425                        "Second field should be 'b' from right table"
5426                    );
5427                }
5428                JoinType::LeftMark => {
5429                    assert_eq!(join.schema.fields().len(), 3);
5430
5431                    let fields = join.schema.fields();
5432                    assert_eq!(
5433                        fields[0].name(),
5434                        "a",
5435                        "First field should be 'a' from left table"
5436                    );
5437                    assert_eq!(
5438                        fields[1].name(),
5439                        "b",
5440                        "Second field should be 'b' from left table"
5441                    );
5442                    assert_eq!(
5443                        fields[2].name(),
5444                        "mark",
5445                        "Third field should be the mark column"
5446                    );
5447
5448                    assert!(!fields[0].is_nullable());
5449                    assert!(!fields[1].is_nullable());
5450                    assert!(!fields[2].is_nullable());
5451                }
5452                _ => {
5453                    assert_eq!(join.schema.fields().len(), 4);
5454
5455                    let fields = join.schema.fields();
5456                    assert_eq!(
5457                        fields[0].name(),
5458                        "a",
5459                        "First field should be 'a' from left table"
5460                    );
5461                    assert_eq!(
5462                        fields[1].name(),
5463                        "b",
5464                        "Second field should be 'b' from left table"
5465                    );
5466                    assert_eq!(
5467                        fields[2].name(),
5468                        "a",
5469                        "Third field should be 'a' from right table"
5470                    );
5471                    assert_eq!(
5472                        fields[3].name(),
5473                        "b",
5474                        "Fourth field should be 'b' from right table"
5475                    );
5476
5477                    if join_type == JoinType::Left {
5478                        // Left side fields (first two) shouldn't be nullable
5479                        assert!(!fields[0].is_nullable());
5480                        assert!(!fields[1].is_nullable());
5481                        // Right side fields (third and fourth) should be nullable
5482                        assert!(fields[2].is_nullable());
5483                        assert!(fields[3].is_nullable());
5484                    } else if join_type == JoinType::Right {
5485                        // Left side fields (first two) should be nullable
5486                        assert!(fields[0].is_nullable());
5487                        assert!(fields[1].is_nullable());
5488                        // Right side fields (third and fourth) shouldn't be nullable
5489                        assert!(!fields[2].is_nullable());
5490                        assert!(!fields[3].is_nullable());
5491                    } else if join_type == JoinType::Full {
5492                        assert!(fields[0].is_nullable());
5493                        assert!(fields[1].is_nullable());
5494                        assert!(fields[2].is_nullable());
5495                        assert!(fields[3].is_nullable());
5496                    }
5497                }
5498            }
5499
5500            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5501            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5502            assert_eq!(join.join_type, join_type);
5503            assert_eq!(join.join_constraint, JoinConstraint::On);
5504            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5505        }
5506
5507        Ok(())
5508    }
5509
5510    #[test]
5511    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5512        let left_schema = Schema::new(vec![
5513            Field::new("id", DataType::Int32, false), // Common column in both tables
5514            Field::new("name", DataType::Utf8, false), // Unique to left
5515            Field::new("value", DataType::Int32, false), // Common column, different meaning
5516        ]);
5517
5518        let right_schema = Schema::new(vec![
5519            Field::new("id", DataType::Int32, false), // Common column in both tables
5520            Field::new("category", DataType::Utf8, false), // Unique to right
5521            Field::new("value", DataType::Float64, true), // Common column, different meaning
5522        ]);
5523
5524        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5525
5526        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5527
5528        // Test 1: USING constraint with a common column
5529        {
5530            // In the logical plan, both copies of the `id` column are preserved
5531            // The USING constraint is handled later during physical execution, where the common column appears once
5532            let join = Join::try_new(
5533                Arc::new(left_plan.clone()),
5534                Arc::new(right_plan.clone()),
5535                vec![(col("t1.id"), col("t2.id"))],
5536                None,
5537                JoinType::Inner,
5538                JoinConstraint::Using,
5539                NullEquality::NullEqualsNothing,
5540            )?;
5541
5542            let fields = join.schema.fields();
5543
5544            assert_eq!(fields.len(), 6);
5545
5546            assert_eq!(
5547                fields[0].name(),
5548                "id",
5549                "First field should be 'id' from left table"
5550            );
5551            assert_eq!(
5552                fields[1].name(),
5553                "name",
5554                "Second field should be 'name' from left table"
5555            );
5556            assert_eq!(
5557                fields[2].name(),
5558                "value",
5559                "Third field should be 'value' from left table"
5560            );
5561            assert_eq!(
5562                fields[3].name(),
5563                "id",
5564                "Fourth field should be 'id' from right table"
5565            );
5566            assert_eq!(
5567                fields[4].name(),
5568                "category",
5569                "Fifth field should be 'category' from right table"
5570            );
5571            assert_eq!(
5572                fields[5].name(),
5573                "value",
5574                "Sixth field should be 'value' from right table"
5575            );
5576
5577            assert_eq!(join.join_constraint, JoinConstraint::Using);
5578        }
5579
5580        // Test 2: Complex join condition with expressions
5581        {
5582            // Complex condition: join on id equality AND where left.value < right.value
5583            let join = Join::try_new(
5584                Arc::new(left_plan.clone()),
5585                Arc::new(right_plan.clone()),
5586                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5587                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5588                JoinType::Inner,
5589                JoinConstraint::On,
5590                NullEquality::NullEqualsNothing,
5591            )?;
5592
5593            let fields = join.schema.fields();
5594            assert_eq!(fields.len(), 6);
5595
5596            assert_eq!(
5597                fields[0].name(),
5598                "id",
5599                "First field should be 'id' from left table"
5600            );
5601            assert_eq!(
5602                fields[1].name(),
5603                "name",
5604                "Second field should be 'name' from left table"
5605            );
5606            assert_eq!(
5607                fields[2].name(),
5608                "value",
5609                "Third field should be 'value' from left table"
5610            );
5611            assert_eq!(
5612                fields[3].name(),
5613                "id",
5614                "Fourth field should be 'id' from right table"
5615            );
5616            assert_eq!(
5617                fields[4].name(),
5618                "category",
5619                "Fifth field should be 'category' from right table"
5620            );
5621            assert_eq!(
5622                fields[5].name(),
5623                "value",
5624                "Sixth field should be 'value' from right table"
5625            );
5626
5627            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5628        }
5629
5630        // Test 3: Join with null equality behavior set to true
5631        {
5632            let join = Join::try_new(
5633                Arc::new(left_plan.clone()),
5634                Arc::new(right_plan.clone()),
5635                vec![(col("t1.id"), col("t2.id"))],
5636                None,
5637                JoinType::Inner,
5638                JoinConstraint::On,
5639                NullEquality::NullEqualsNull,
5640            )?;
5641
5642            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5643        }
5644
5645        Ok(())
5646    }
5647
5648    #[test]
5649    fn test_join_try_new_schema_validation() -> Result<()> {
5650        let left_schema = Schema::new(vec![
5651            Field::new("id", DataType::Int32, false),
5652            Field::new("name", DataType::Utf8, false),
5653            Field::new("value", DataType::Float64, true),
5654        ]);
5655
5656        let right_schema = Schema::new(vec![
5657            Field::new("id", DataType::Int32, false),
5658            Field::new("category", DataType::Utf8, true),
5659            Field::new("code", DataType::Int16, false),
5660        ]);
5661
5662        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5663
5664        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5665
5666        let join_types = vec![
5667            JoinType::Inner,
5668            JoinType::Left,
5669            JoinType::Right,
5670            JoinType::Full,
5671        ];
5672
5673        for join_type in join_types {
5674            let join = Join::try_new(
5675                Arc::new(left_plan.clone()),
5676                Arc::new(right_plan.clone()),
5677                vec![(col("t1.id"), col("t2.id"))],
5678                Some(col("t1.value").gt(lit(5.0))),
5679                join_type,
5680                JoinConstraint::On,
5681                NullEquality::NullEqualsNothing,
5682            )?;
5683
5684            let fields = join.schema.fields();
5685            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5686
5687            for (i, field) in fields.iter().enumerate() {
5688                let expected_nullable = match (i, &join_type) {
5689                    // Left table fields (indices 0, 1, 2)
5690                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5691                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5692                    (2, _) => true, // value is already nullable
5693
5694                    // Right table fields (indices 3, 4, 5)
5695                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5696                    (4, _) => true, // category is already nullable
5697                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5698
5699                    _ => false,
5700                };
5701
5702                assert_eq!(
5703                    field.is_nullable(),
5704                    expected_nullable,
5705                    "Field {} ({}) nullability incorrect for {:?} join",
5706                    i,
5707                    field.name(),
5708                    join_type
5709                );
5710            }
5711        }
5712
5713        let using_join = Join::try_new(
5714            Arc::new(left_plan.clone()),
5715            Arc::new(right_plan.clone()),
5716            vec![(col("t1.id"), col("t2.id"))],
5717            None,
5718            JoinType::Inner,
5719            JoinConstraint::Using,
5720            NullEquality::NullEqualsNothing,
5721        )?;
5722
5723        assert_eq!(
5724            using_join.schema.fields().len(),
5725            6,
5726            "USING join should have all fields"
5727        );
5728        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5729
5730        Ok(())
5731    }
5732}