datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::dml::CopyTo;
27use super::invariants::{
28    assert_always_invariants_at_current_node, assert_executable_invariants,
29    InvariantLevel,
30};
31use super::DdlStatement;
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34    intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction,
35    WindowFunctionParams,
36};
37use crate::expr_rewriter::{
38    create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48    build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
49    CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
50    Operator, Prepare, TableProviderFilterPushDown, TableSource,
51    WindowFunctionDefinition,
52};
53
54use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
55use datafusion_common::cse::{NormalizeEq, Normalizeable};
56use datafusion_common::format::ExplainFormat;
57use datafusion_common::metadata::check_metadata_with_storage_equal;
58use datafusion_common::tree_node::{
59    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
60};
61use datafusion_common::{
62    aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
63    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
64    FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
65    TableReference, UnnestOptions,
66};
67use indexmap::IndexSet;
68
69// backwards compatibility
70use crate::display::PgJsonVisitor;
71pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
72pub use datafusion_common::{JoinConstraint, JoinType};
73
74/// A `LogicalPlan` is a node in a tree of relational operators (such as
75/// Projection or Filter).
76///
77/// Represents transforming an input relation (table) to an output relation
78/// (table) with a potentially different schema. Plans form a dataflow tree
79/// where data flows from leaves up to the root to produce the query result.
80///
81/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
82/// or programmatically (for example custom query languages).
83///
84/// # See also:
85/// * [`Expr`]: For the expressions that are evaluated by the plan
86/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
87/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
88///
89/// [`tree_node`]: crate::logical_plan::tree_node
90///
91/// # Examples
92///
93/// ## Creating a LogicalPlan from SQL:
94///
95/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
96///
97/// ## Creating a LogicalPlan from the DataFrame API:
98///
99/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
100///
101/// ## Creating a LogicalPlan programmatically:
102///
103/// See [`LogicalPlanBuilder`]
104///
105/// # Visiting and Rewriting `LogicalPlan`s
106///
107/// Using the [`tree_node`] API, you can recursively walk all nodes in a
108/// `LogicalPlan`. For example, to find all column references in a plan:
109///
110/// ```
111/// # use std::collections::HashSet;
112/// # use arrow::datatypes::{DataType, Field, Schema};
113/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
114/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
115/// # use datafusion_common::{Column, Result};
116/// # fn employee_schema() -> Schema {
117/// #    Schema::new(vec![
118/// #           Field::new("name", DataType::Utf8, false),
119/// #           Field::new("salary", DataType::Int32, false),
120/// #       ])
121/// #   }
122/// // Projection(name, salary)
123/// //   Filter(salary > 1000)
124/// //     TableScan(employee)
125/// # fn main() -> Result<()> {
126/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
127///  .filter(col("salary").gt(lit(1000)))?
128///  .project(vec![col("name")])?
129///  .build()?;
130///
131/// // use apply to walk the plan and collect all expressions
132/// let mut expressions = HashSet::new();
133/// plan.apply(|node| {
134///   // collect all expressions in the plan
135///   node.apply_expressions(|expr| {
136///    expressions.insert(expr.clone());
137///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
138///   })?;
139///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
140/// }).unwrap();
141///
142/// // we found the expression in projection and filter
143/// assert_eq!(expressions.len(), 2);
144/// println!("Found expressions: {:?}", expressions);
145/// // found predicate in the Filter: employee.salary > 1000
146/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
147/// assert!(expressions.contains(&salary.gt(lit(1000))));
148/// // found projection in the Projection: employee.name
149/// let name = Expr::Column(Column::new(Some("employee"), "name"));
150/// assert!(expressions.contains(&name));
151/// # Ok(())
152/// # }
153/// ```
154///
155/// You can also rewrite plans using the [`tree_node`] API. For example, to
156/// replace the filter predicate in a plan:
157///
158/// ```
159/// # use std::collections::HashSet;
160/// # use arrow::datatypes::{DataType, Field, Schema};
161/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
162/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
163/// # use datafusion_common::{Column, Result};
164/// # fn employee_schema() -> Schema {
165/// #    Schema::new(vec![
166/// #           Field::new("name", DataType::Utf8, false),
167/// #           Field::new("salary", DataType::Int32, false),
168/// #       ])
169/// #   }
170/// // Projection(name, salary)
171/// //   Filter(salary > 1000)
172/// //     TableScan(employee)
173/// # fn main() -> Result<()> {
174/// use datafusion_common::tree_node::Transformed;
175/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
176///  .filter(col("salary").gt(lit(1000)))?
177///  .project(vec![col("name")])?
178///  .build()?;
179///
180/// // use transform to rewrite the plan
181/// let transformed_result = plan.transform(|node| {
182///   // when we see the filter node
183///   if let LogicalPlan::Filter(mut filter) = node {
184///     // replace predicate with salary < 2000
185///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
186///     let new_plan = LogicalPlan::Filter(filter);
187///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
188///   }
189///   // return the node unchanged
190///   Ok(Transformed::no(node))
191/// }).unwrap();
192///
193/// // Transformed result contains rewritten plan and information about
194/// // whether the plan was changed
195/// assert!(transformed_result.transformed);
196/// let rewritten_plan = transformed_result.data;
197///
198/// // we found the filter
199/// assert_eq!(rewritten_plan.display_indent().to_string(),
200/// "Projection: employee.name\
201/// \n  Filter: employee.salary < Int32(2000)\
202/// \n    TableScan: employee");
203/// # Ok(())
204/// # }
205/// ```
206#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
207pub enum LogicalPlan {
208    /// Evaluates an arbitrary list of expressions (essentially a
209    /// SELECT with an expression list) on its input.
210    Projection(Projection),
211    /// Filters rows from its input that do not match an
212    /// expression (essentially a WHERE clause with a predicate
213    /// expression).
214    ///
215    /// Semantically, `<predicate>` is evaluated for each row of the
216    /// input; If the value of `<predicate>` is true, the input row is
217    /// passed to the output. If the value of `<predicate>` is false
218    /// (or null), the row is discarded.
219    Filter(Filter),
220    /// Windows input based on a set of window spec and window
221    /// function (e.g. SUM or RANK).  This is used to implement SQL
222    /// window functions, and the `OVER` clause.
223    ///
224    /// See [`Window`] for more details
225    Window(Window),
226    /// Aggregates its input based on a set of grouping and aggregate
227    /// expressions (e.g. SUM). This is used to implement SQL aggregates
228    /// and `GROUP BY`.
229    ///
230    /// See [`Aggregate`] for more details
231    Aggregate(Aggregate),
232    /// Sorts its input according to a list of sort expressions. This
233    /// is used to implement SQL `ORDER BY`
234    Sort(Sort),
235    /// Join two logical plans on one or more join columns.
236    /// This is used to implement SQL `JOIN`
237    Join(Join),
238    /// Repartitions the input based on a partitioning scheme. This is
239    /// used to add parallelism and is sometimes referred to as an
240    /// "exchange" operator in other systems
241    Repartition(Repartition),
242    /// Union multiple inputs with the same schema into a single
243    /// output stream. This is used to implement SQL `UNION [ALL]` and
244    /// `INTERSECT [ALL]`.
245    Union(Union),
246    /// Produces rows from a [`TableSource`], used to implement SQL
247    /// `FROM` tables or views.
248    TableScan(TableScan),
249    /// Produces no rows: An empty relation with an empty schema that
250    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
251    /// that has no values in the `FROM` clause.
252    EmptyRelation(EmptyRelation),
253    /// Produces the output of running another query.  This is used to
254    /// implement SQL subqueries
255    Subquery(Subquery),
256    /// Aliased relation provides, or changes, the name of a relation.
257    SubqueryAlias(SubqueryAlias),
258    /// Skip some number of rows, and then fetch some number of rows.
259    Limit(Limit),
260    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
261    Statement(Statement),
262    /// Values expression. See
263    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
264    /// documentation for more details. This is used to implement SQL such as
265    /// `VALUES (1, 2), (3, 4)`
266    Values(Values),
267    /// Produces a relation with string representations of
268    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
269    Explain(Explain),
270    /// Runs the input, and prints annotated physical plan as a string
271    /// with execution metric. This is used to implement SQL
272    /// `EXPLAIN ANALYZE`.
273    Analyze(Analyze),
274    /// Extension operator defined outside of DataFusion. This is used
275    /// to extend DataFusion with custom relational operations that
276    Extension(Extension),
277    /// Remove duplicate rows from the input. This is used to
278    /// implement SQL `SELECT DISTINCT ...`.
279    Distinct(Distinct),
280    /// Data Manipulation Language (DML): Insert / Update / Delete
281    Dml(DmlStatement),
282    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
283    Ddl(DdlStatement),
284    /// `COPY TO` for writing plan results to files
285    Copy(CopyTo),
286    /// Describe the schema of the table. This is used to implement the
287    /// SQL `DESCRIBE` command from MySQL.
288    DescribeTable(DescribeTable),
289    /// Unnest a column that contains a nested list type such as an
290    /// ARRAY. This is used to implement SQL `UNNEST`
291    Unnest(Unnest),
292    /// A variadic query (e.g. "Recursive CTEs")
293    RecursiveQuery(RecursiveQuery),
294}
295
296impl Default for LogicalPlan {
297    fn default() -> Self {
298        LogicalPlan::EmptyRelation(EmptyRelation {
299            produce_one_row: false,
300            schema: Arc::new(DFSchema::empty()),
301        })
302    }
303}
304
305impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
306    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
307        &'a self,
308        mut f: F,
309    ) -> Result<TreeNodeRecursion> {
310        f(self)
311    }
312
313    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
314        self,
315        mut f: F,
316    ) -> Result<Transformed<Self>> {
317        f(self)
318    }
319}
320
321impl LogicalPlan {
322    /// Get a reference to the logical plan's schema
323    pub fn schema(&self) -> &DFSchemaRef {
324        match self {
325            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
326            LogicalPlan::Values(Values { schema, .. }) => schema,
327            LogicalPlan::TableScan(TableScan {
328                projected_schema, ..
329            }) => projected_schema,
330            LogicalPlan::Projection(Projection { schema, .. }) => schema,
331            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
332            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
333            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
334            LogicalPlan::Window(Window { schema, .. }) => schema,
335            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
336            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
337            LogicalPlan::Join(Join { schema, .. }) => schema,
338            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
339            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
340            LogicalPlan::Statement(statement) => statement.schema(),
341            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
342            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
343            LogicalPlan::Explain(explain) => &explain.schema,
344            LogicalPlan::Analyze(analyze) => &analyze.schema,
345            LogicalPlan::Extension(extension) => extension.node.schema(),
346            LogicalPlan::Union(Union { schema, .. }) => schema,
347            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
348                output_schema
349            }
350            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
351            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
352            LogicalPlan::Ddl(ddl) => ddl.schema(),
353            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
354            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
355                // we take the schema of the static term as the schema of the entire recursive query
356                static_term.schema()
357            }
358        }
359    }
360
361    /// Used for normalizing columns, as the fallback schemas to the main schema
362    /// of the plan.
363    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
364        match self {
365            LogicalPlan::Window(_)
366            | LogicalPlan::Projection(_)
367            | LogicalPlan::Aggregate(_)
368            | LogicalPlan::Unnest(_)
369            | LogicalPlan::Join(_) => self
370                .inputs()
371                .iter()
372                .map(|input| input.schema().as_ref())
373                .collect(),
374            _ => vec![],
375        }
376    }
377
378    /// Returns the (fixed) output schema for explain plans
379    pub fn explain_schema() -> SchemaRef {
380        SchemaRef::new(Schema::new(vec![
381            Field::new("plan_type", DataType::Utf8, false),
382            Field::new("plan", DataType::Utf8, false),
383        ]))
384    }
385
386    /// Returns the (fixed) output schema for `DESCRIBE` plans
387    pub fn describe_schema() -> Schema {
388        Schema::new(vec![
389            Field::new("column_name", DataType::Utf8, false),
390            Field::new("data_type", DataType::Utf8, false),
391            Field::new("is_nullable", DataType::Utf8, false),
392        ])
393    }
394
395    /// Returns all expressions (non-recursively) evaluated by the current
396    /// logical plan node. This does not include expressions in any children.
397    ///
398    /// Note this method `clone`s all the expressions. When possible, the
399    /// [`tree_node`] API should be used instead of this API.
400    ///
401    /// The returned expressions do not necessarily represent or even
402    /// contributed to the output schema of this node. For example,
403    /// `LogicalPlan::Filter` returns the filter expression even though the
404    /// output of a Filter has the same columns as the input.
405    ///
406    /// The expressions do contain all the columns that are used by this plan,
407    /// so if there are columns not referenced by these expressions then
408    /// DataFusion's optimizer attempts to optimize them away.
409    ///
410    /// [`tree_node`]: crate::logical_plan::tree_node
411    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
412        let mut exprs = vec![];
413        self.apply_expressions(|e| {
414            exprs.push(e.clone());
415            Ok(TreeNodeRecursion::Continue)
416        })
417        // closure always returns OK
418        .unwrap();
419        exprs
420    }
421
422    /// Returns all the out reference(correlated) expressions (recursively) in the current
423    /// logical plan nodes and all its descendant nodes.
424    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
425        let mut exprs = vec![];
426        self.apply_expressions(|e| {
427            find_out_reference_exprs(e).into_iter().for_each(|e| {
428                if !exprs.contains(&e) {
429                    exprs.push(e)
430                }
431            });
432            Ok(TreeNodeRecursion::Continue)
433        })
434        // closure always returns OK
435        .unwrap();
436        self.inputs()
437            .into_iter()
438            .flat_map(|child| child.all_out_ref_exprs())
439            .for_each(|e| {
440                if !exprs.contains(&e) {
441                    exprs.push(e)
442                }
443            });
444        exprs
445    }
446
447    /// Returns all inputs / children of this `LogicalPlan` node.
448    ///
449    /// Note does not include inputs to inputs, or subqueries.
450    pub fn inputs(&self) -> Vec<&LogicalPlan> {
451        match self {
452            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
453            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
454            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
455            LogicalPlan::Window(Window { input, .. }) => vec![input],
456            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
457            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
458            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
459            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
460            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
461            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
462            LogicalPlan::Extension(extension) => extension.node.inputs(),
463            LogicalPlan::Union(Union { inputs, .. }) => {
464                inputs.iter().map(|arc| arc.as_ref()).collect()
465            }
466            LogicalPlan::Distinct(
467                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
468            ) => vec![input],
469            LogicalPlan::Explain(explain) => vec![&explain.plan],
470            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
471            LogicalPlan::Dml(write) => vec![&write.input],
472            LogicalPlan::Copy(copy) => vec![&copy.input],
473            LogicalPlan::Ddl(ddl) => ddl.inputs(),
474            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
475            LogicalPlan::RecursiveQuery(RecursiveQuery {
476                static_term,
477                recursive_term,
478                ..
479            }) => vec![static_term, recursive_term],
480            LogicalPlan::Statement(stmt) => stmt.inputs(),
481            // plans without inputs
482            LogicalPlan::TableScan { .. }
483            | LogicalPlan::EmptyRelation { .. }
484            | LogicalPlan::Values { .. }
485            | LogicalPlan::DescribeTable(_) => vec![],
486        }
487    }
488
489    /// returns all `Using` join columns in a logical plan
490    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
491        let mut using_columns: Vec<HashSet<Column>> = vec![];
492
493        self.apply_with_subqueries(|plan| {
494            if let LogicalPlan::Join(Join {
495                join_constraint: JoinConstraint::Using,
496                on,
497                ..
498            }) = plan
499            {
500                // The join keys in using-join must be columns.
501                let columns =
502                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
503                        let Some(l) = l.get_as_join_column() else {
504                            return internal_err!(
505                                "Invalid join key. Expected column, found {l:?}"
506                            );
507                        };
508                        let Some(r) = r.get_as_join_column() else {
509                            return internal_err!(
510                                "Invalid join key. Expected column, found {r:?}"
511                            );
512                        };
513                        accumu.insert(l.to_owned());
514                        accumu.insert(r.to_owned());
515                        Result::<_, DataFusionError>::Ok(accumu)
516                    })?;
517                using_columns.push(columns);
518            }
519            Ok(TreeNodeRecursion::Continue)
520        })?;
521
522        Ok(using_columns)
523    }
524
525    /// returns the first output expression of this `LogicalPlan` node.
526    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
527        match self {
528            LogicalPlan::Projection(projection) => {
529                Ok(Some(projection.expr.as_slice()[0].clone()))
530            }
531            LogicalPlan::Aggregate(agg) => {
532                if agg.group_expr.is_empty() {
533                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
534                } else {
535                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
536                }
537            }
538            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
539                Ok(Some(select_expr[0].clone()))
540            }
541            LogicalPlan::Filter(Filter { input, .. })
542            | LogicalPlan::Distinct(Distinct::All(input))
543            | LogicalPlan::Sort(Sort { input, .. })
544            | LogicalPlan::Limit(Limit { input, .. })
545            | LogicalPlan::Repartition(Repartition { input, .. })
546            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
547            LogicalPlan::Join(Join {
548                left,
549                right,
550                join_type,
551                ..
552            }) => match join_type {
553                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
554                    if left.schema().fields().is_empty() {
555                        right.head_output_expr()
556                    } else {
557                        left.head_output_expr()
558                    }
559                }
560                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
561                    left.head_output_expr()
562                }
563                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
564                    right.head_output_expr()
565                }
566            },
567            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
568                static_term.head_output_expr()
569            }
570            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
571                union.schema.qualified_field(0),
572            )))),
573            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
574                table.projected_schema.qualified_field(0),
575            )))),
576            LogicalPlan::SubqueryAlias(subquery_alias) => {
577                let expr_opt = subquery_alias.input.head_output_expr()?;
578                expr_opt
579                    .map(|expr| {
580                        Ok(Expr::Column(create_col_from_scalar_expr(
581                            &expr,
582                            subquery_alias.alias.to_string(),
583                        )?))
584                    })
585                    .map_or(Ok(None), |v| v.map(Some))
586            }
587            LogicalPlan::Subquery(_) => Ok(None),
588            LogicalPlan::EmptyRelation(_)
589            | LogicalPlan::Statement(_)
590            | LogicalPlan::Values(_)
591            | LogicalPlan::Explain(_)
592            | LogicalPlan::Analyze(_)
593            | LogicalPlan::Extension(_)
594            | LogicalPlan::Dml(_)
595            | LogicalPlan::Copy(_)
596            | LogicalPlan::Ddl(_)
597            | LogicalPlan::DescribeTable(_)
598            | LogicalPlan::Unnest(_) => Ok(None),
599        }
600    }
601
602    /// Recomputes schema and type information for this LogicalPlan if needed.
603    ///
604    /// Some `LogicalPlan`s may need to recompute their schema if the number or
605    /// type of expressions have been changed (for example due to type
606    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
607    /// its expressions.
608    ///
609    /// Some `LogicalPlan`s schema is unaffected by any changes to their
610    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
611    /// same as its input schema.
612    ///
613    /// This is useful after modifying a plans `Expr`s (or input plans) via
614    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
615    /// [Self::with_new_exprs], this method does not require a new set of
616    /// expressions or inputs plans.
617    ///
618    /// # Return value
619    /// Returns an error if there is some issue recomputing the schema.
620    ///
621    /// # Notes
622    ///
623    /// * Does not recursively recompute schema for input (child) plans.
624    pub fn recompute_schema(self) -> Result<Self> {
625        match self {
626            // Since expr may be different than the previous expr, schema of the projection
627            // may change. We need to use try_new method instead of try_new_with_schema method.
628            LogicalPlan::Projection(Projection {
629                expr,
630                input,
631                schema: _,
632            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633            LogicalPlan::Dml(_) => Ok(self),
634            LogicalPlan::Copy(_) => Ok(self),
635            LogicalPlan::Values(Values { schema, values }) => {
636                // todo it isn't clear why the schema is not recomputed here
637                Ok(LogicalPlan::Values(Values { schema, values }))
638            }
639            LogicalPlan::Filter(Filter { predicate, input }) => {
640                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
641            }
642            LogicalPlan::Repartition(_) => Ok(self),
643            LogicalPlan::Window(Window {
644                input,
645                window_expr,
646                schema: _,
647            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
648            LogicalPlan::Aggregate(Aggregate {
649                input,
650                group_expr,
651                aggr_expr,
652                schema: _,
653            }) => Aggregate::try_new(input, group_expr, aggr_expr)
654                .map(LogicalPlan::Aggregate),
655            LogicalPlan::Sort(_) => Ok(self),
656            LogicalPlan::Join(Join {
657                left,
658                right,
659                filter,
660                join_type,
661                join_constraint,
662                on,
663                schema: _,
664                null_equality,
665            }) => {
666                let schema =
667                    build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669                let new_on: Vec<_> = on
670                    .into_iter()
671                    .map(|equi_expr| {
672                        // SimplifyExpression rule may add alias to the equi_expr.
673                        (equi_expr.0.unalias(), equi_expr.1.unalias())
674                    })
675                    .collect();
676
677                Ok(LogicalPlan::Join(Join {
678                    left,
679                    right,
680                    join_type,
681                    join_constraint,
682                    on: new_on,
683                    filter,
684                    schema: DFSchemaRef::new(schema),
685                    null_equality,
686                }))
687            }
688            LogicalPlan::Subquery(_) => Ok(self),
689            LogicalPlan::SubqueryAlias(SubqueryAlias {
690                input,
691                alias,
692                schema: _,
693            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
694            LogicalPlan::Limit(_) => Ok(self),
695            LogicalPlan::Ddl(_) => Ok(self),
696            LogicalPlan::Extension(Extension { node }) => {
697                // todo make an API that does not require cloning
698                // This requires a copy of the extension nodes expressions and inputs
699                let expr = node.expressions();
700                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
701                Ok(LogicalPlan::Extension(Extension {
702                    node: node.with_exprs_and_inputs(expr, inputs)?,
703                }))
704            }
705            LogicalPlan::Union(Union { inputs, schema }) => {
706                let first_input_schema = inputs[0].schema();
707                if schema.fields().len() == first_input_schema.fields().len() {
708                    // If inputs are not pruned do not change schema
709                    Ok(LogicalPlan::Union(Union { inputs, schema }))
710                } else {
711                    // A note on `Union`s constructed via `try_new_by_name`:
712                    //
713                    // At this point, the schema for each input should have
714                    // the same width. Thus, we do not need to save whether a
715                    // `Union` was created `BY NAME`, and can safely rely on the
716                    // `try_new` initializer to derive the new schema based on
717                    // column positions.
718                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
719                }
720            }
721            LogicalPlan::Distinct(distinct) => {
722                let distinct = match distinct {
723                    Distinct::All(input) => Distinct::All(input),
724                    Distinct::On(DistinctOn {
725                        on_expr,
726                        select_expr,
727                        sort_expr,
728                        input,
729                        schema: _,
730                    }) => Distinct::On(DistinctOn::try_new(
731                        on_expr,
732                        select_expr,
733                        sort_expr,
734                        input,
735                    )?),
736                };
737                Ok(LogicalPlan::Distinct(distinct))
738            }
739            LogicalPlan::RecursiveQuery(_) => Ok(self),
740            LogicalPlan::Analyze(_) => Ok(self),
741            LogicalPlan::Explain(_) => Ok(self),
742            LogicalPlan::TableScan(_) => Ok(self),
743            LogicalPlan::EmptyRelation(_) => Ok(self),
744            LogicalPlan::Statement(_) => Ok(self),
745            LogicalPlan::DescribeTable(_) => Ok(self),
746            LogicalPlan::Unnest(Unnest {
747                input,
748                exec_columns,
749                options,
750                ..
751            }) => {
752                // Update schema with unnested column type.
753                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
754            }
755        }
756    }
757
758    /// Returns a new `LogicalPlan` based on `self` with inputs and
759    /// expressions replaced.
760    ///
761    /// Note this method creates an entirely new node, which requires a large
762    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
763    /// instead of this API.
764    ///
765    /// The exprs correspond to the same order of expressions returned
766    /// by [`Self::expressions`]. This function is used by optimizers
767    /// to rewrite plans using the following pattern:
768    ///
769    /// [`tree_node`]: crate::logical_plan::tree_node
770    ///
771    /// ```text
772    /// let new_inputs = optimize_children(..., plan, props);
773    ///
774    /// // get the plans expressions to optimize
775    /// let exprs = plan.expressions();
776    ///
777    /// // potentially rewrite plan expressions
778    /// let rewritten_exprs = rewrite_exprs(exprs);
779    ///
780    /// // create new plan using rewritten_exprs in same position
781    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
782    /// ```
783    pub fn with_new_exprs(
784        &self,
785        mut expr: Vec<Expr>,
786        inputs: Vec<LogicalPlan>,
787    ) -> Result<LogicalPlan> {
788        match self {
789            // Since expr may be different than the previous expr, schema of the projection
790            // may change. We need to use try_new method instead of try_new_with_schema method.
791            LogicalPlan::Projection(Projection { .. }) => {
792                let input = self.only_input(inputs)?;
793                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
794            }
795            LogicalPlan::Dml(DmlStatement {
796                table_name,
797                target,
798                op,
799                ..
800            }) => {
801                self.assert_no_expressions(expr)?;
802                let input = self.only_input(inputs)?;
803                Ok(LogicalPlan::Dml(DmlStatement::new(
804                    table_name.clone(),
805                    Arc::clone(target),
806                    op.clone(),
807                    Arc::new(input),
808                )))
809            }
810            LogicalPlan::Copy(CopyTo {
811                input: _,
812                output_url,
813                file_type,
814                options,
815                partition_by,
816                output_schema: _,
817            }) => {
818                self.assert_no_expressions(expr)?;
819                let input = self.only_input(inputs)?;
820                Ok(LogicalPlan::Copy(CopyTo::new(
821                    Arc::new(input),
822                    output_url.clone(),
823                    partition_by.clone(),
824                    Arc::clone(file_type),
825                    options.clone(),
826                )))
827            }
828            LogicalPlan::Values(Values { schema, .. }) => {
829                self.assert_no_inputs(inputs)?;
830                Ok(LogicalPlan::Values(Values {
831                    schema: Arc::clone(schema),
832                    values: expr
833                        .chunks_exact(schema.fields().len())
834                        .map(|s| s.to_vec())
835                        .collect(),
836                }))
837            }
838            LogicalPlan::Filter { .. } => {
839                let predicate = self.only_expr(expr)?;
840                let input = self.only_input(inputs)?;
841
842                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
843            }
844            LogicalPlan::Repartition(Repartition {
845                partitioning_scheme,
846                ..
847            }) => match partitioning_scheme {
848                Partitioning::RoundRobinBatch(n) => {
849                    self.assert_no_expressions(expr)?;
850                    let input = self.only_input(inputs)?;
851                    Ok(LogicalPlan::Repartition(Repartition {
852                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
853                        input: Arc::new(input),
854                    }))
855                }
856                Partitioning::Hash(_, n) => {
857                    let input = self.only_input(inputs)?;
858                    Ok(LogicalPlan::Repartition(Repartition {
859                        partitioning_scheme: Partitioning::Hash(expr, *n),
860                        input: Arc::new(input),
861                    }))
862                }
863                Partitioning::DistributeBy(_) => {
864                    let input = self.only_input(inputs)?;
865                    Ok(LogicalPlan::Repartition(Repartition {
866                        partitioning_scheme: Partitioning::DistributeBy(expr),
867                        input: Arc::new(input),
868                    }))
869                }
870            },
871            LogicalPlan::Window(Window { window_expr, .. }) => {
872                assert_eq!(window_expr.len(), expr.len());
873                let input = self.only_input(inputs)?;
874                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
875            }
876            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
877                let input = self.only_input(inputs)?;
878                // group exprs are the first expressions
879                let agg_expr = expr.split_off(group_expr.len());
880
881                Aggregate::try_new(Arc::new(input), expr, agg_expr)
882                    .map(LogicalPlan::Aggregate)
883            }
884            LogicalPlan::Sort(Sort {
885                expr: sort_expr,
886                fetch,
887                ..
888            }) => {
889                let input = self.only_input(inputs)?;
890                Ok(LogicalPlan::Sort(Sort {
891                    expr: expr
892                        .into_iter()
893                        .zip(sort_expr.iter())
894                        .map(|(expr, sort)| sort.with_expr(expr))
895                        .collect(),
896                    input: Arc::new(input),
897                    fetch: *fetch,
898                }))
899            }
900            LogicalPlan::Join(Join {
901                join_type,
902                join_constraint,
903                on,
904                null_equality,
905                ..
906            }) => {
907                let (left, right) = self.only_two_inputs(inputs)?;
908                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
909
910                let equi_expr_count = on.len() * 2;
911                assert!(expr.len() >= equi_expr_count);
912
913                // Assume that the last expr, if any,
914                // is the filter_expr (non equality predicate from ON clause)
915                let filter_expr = if expr.len() > equi_expr_count {
916                    expr.pop()
917                } else {
918                    None
919                };
920
921                // The first part of expr is equi-exprs,
922                // and the struct of each equi-expr is like `left-expr = right-expr`.
923                assert_eq!(expr.len(), equi_expr_count);
924                let mut new_on = Vec::with_capacity(on.len());
925                let mut iter = expr.into_iter();
926                while let Some(left) = iter.next() {
927                    let Some(right) = iter.next() else {
928                        internal_err!("Expected a pair of expressions to construct the join on expression")?
929                    };
930
931                    // SimplifyExpression rule may add alias to the equi_expr.
932                    new_on.push((left.unalias(), right.unalias()));
933                }
934
935                Ok(LogicalPlan::Join(Join {
936                    left: Arc::new(left),
937                    right: Arc::new(right),
938                    join_type: *join_type,
939                    join_constraint: *join_constraint,
940                    on: new_on,
941                    filter: filter_expr,
942                    schema: DFSchemaRef::new(schema),
943                    null_equality: *null_equality,
944                }))
945            }
946            LogicalPlan::Subquery(Subquery {
947                outer_ref_columns,
948                spans,
949                ..
950            }) => {
951                self.assert_no_expressions(expr)?;
952                let input = self.only_input(inputs)?;
953                let subquery = LogicalPlanBuilder::from(input).build()?;
954                Ok(LogicalPlan::Subquery(Subquery {
955                    subquery: Arc::new(subquery),
956                    outer_ref_columns: outer_ref_columns.clone(),
957                    spans: spans.clone(),
958                }))
959            }
960            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
961                self.assert_no_expressions(expr)?;
962                let input = self.only_input(inputs)?;
963                SubqueryAlias::try_new(Arc::new(input), alias.clone())
964                    .map(LogicalPlan::SubqueryAlias)
965            }
966            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
967                let old_expr_len = skip.iter().chain(fetch.iter()).count();
968                if old_expr_len != expr.len() {
969                    return internal_err!(
970                        "Invalid number of new Limit expressions: expected {}, got {}",
971                        old_expr_len,
972                        expr.len()
973                    );
974                }
975                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
976                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
977                let new_skip = skip.as_ref().and_then(|_| expr.pop());
978                let input = self.only_input(inputs)?;
979                Ok(LogicalPlan::Limit(Limit {
980                    skip: new_skip.map(Box::new),
981                    fetch: new_fetch.map(Box::new),
982                    input: Arc::new(input),
983                }))
984            }
985            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
986                name,
987                if_not_exists,
988                or_replace,
989                column_defaults,
990                temporary,
991                ..
992            })) => {
993                self.assert_no_expressions(expr)?;
994                let input = self.only_input(inputs)?;
995                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
996                    CreateMemoryTable {
997                        input: Arc::new(input),
998                        constraints: Constraints::default(),
999                        name: name.clone(),
1000                        if_not_exists: *if_not_exists,
1001                        or_replace: *or_replace,
1002                        column_defaults: column_defaults.clone(),
1003                        temporary: *temporary,
1004                    },
1005                )))
1006            }
1007            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1008                name,
1009                or_replace,
1010                definition,
1011                temporary,
1012                ..
1013            })) => {
1014                self.assert_no_expressions(expr)?;
1015                let input = self.only_input(inputs)?;
1016                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1017                    input: Arc::new(input),
1018                    name: name.clone(),
1019                    or_replace: *or_replace,
1020                    temporary: *temporary,
1021                    definition: definition.clone(),
1022                })))
1023            }
1024            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1025                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1026            })),
1027            LogicalPlan::Union(Union { schema, .. }) => {
1028                self.assert_no_expressions(expr)?;
1029                let input_schema = inputs[0].schema();
1030                // If inputs are not pruned do not change schema.
1031                let schema = if schema.fields().len() == input_schema.fields().len() {
1032                    Arc::clone(schema)
1033                } else {
1034                    Arc::clone(input_schema)
1035                };
1036                Ok(LogicalPlan::Union(Union {
1037                    inputs: inputs.into_iter().map(Arc::new).collect(),
1038                    schema,
1039                }))
1040            }
1041            LogicalPlan::Distinct(distinct) => {
1042                let distinct = match distinct {
1043                    Distinct::All(_) => {
1044                        self.assert_no_expressions(expr)?;
1045                        let input = self.only_input(inputs)?;
1046                        Distinct::All(Arc::new(input))
1047                    }
1048                    Distinct::On(DistinctOn {
1049                        on_expr,
1050                        select_expr,
1051                        ..
1052                    }) => {
1053                        let input = self.only_input(inputs)?;
1054                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1055                        let select_expr = expr.split_off(on_expr.len());
1056                        assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1057                        Distinct::On(DistinctOn::try_new(
1058                            expr,
1059                            select_expr,
1060                            None, // no sort expressions accepted
1061                            Arc::new(input),
1062                        )?)
1063                    }
1064                };
1065                Ok(LogicalPlan::Distinct(distinct))
1066            }
1067            LogicalPlan::RecursiveQuery(RecursiveQuery {
1068                name, is_distinct, ..
1069            }) => {
1070                self.assert_no_expressions(expr)?;
1071                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1072                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1073                    name: name.clone(),
1074                    static_term: Arc::new(static_term),
1075                    recursive_term: Arc::new(recursive_term),
1076                    is_distinct: *is_distinct,
1077                }))
1078            }
1079            LogicalPlan::Analyze(a) => {
1080                self.assert_no_expressions(expr)?;
1081                let input = self.only_input(inputs)?;
1082                Ok(LogicalPlan::Analyze(Analyze {
1083                    verbose: a.verbose,
1084                    schema: Arc::clone(&a.schema),
1085                    input: Arc::new(input),
1086                }))
1087            }
1088            LogicalPlan::Explain(e) => {
1089                self.assert_no_expressions(expr)?;
1090                let input = self.only_input(inputs)?;
1091                Ok(LogicalPlan::Explain(Explain {
1092                    verbose: e.verbose,
1093                    plan: Arc::new(input),
1094                    explain_format: e.explain_format.clone(),
1095                    stringified_plans: e.stringified_plans.clone(),
1096                    schema: Arc::clone(&e.schema),
1097                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1098                }))
1099            }
1100            LogicalPlan::Statement(Statement::Prepare(Prepare {
1101                name, fields, ..
1102            })) => {
1103                self.assert_no_expressions(expr)?;
1104                let input = self.only_input(inputs)?;
1105                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1106                    name: name.clone(),
1107                    fields: fields.clone(),
1108                    input: Arc::new(input),
1109                })))
1110            }
1111            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1112                self.assert_no_inputs(inputs)?;
1113                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1114                    name: name.clone(),
1115                    parameters: expr,
1116                })))
1117            }
1118            LogicalPlan::TableScan(ts) => {
1119                self.assert_no_inputs(inputs)?;
1120                Ok(LogicalPlan::TableScan(TableScan {
1121                    filters: expr,
1122                    ..ts.clone()
1123                }))
1124            }
1125            LogicalPlan::EmptyRelation(_)
1126            | LogicalPlan::Ddl(_)
1127            | LogicalPlan::Statement(_)
1128            | LogicalPlan::DescribeTable(_) => {
1129                // All of these plan types have no inputs / exprs so should not be called
1130                self.assert_no_expressions(expr)?;
1131                self.assert_no_inputs(inputs)?;
1132                Ok(self.clone())
1133            }
1134            LogicalPlan::Unnest(Unnest {
1135                exec_columns: columns,
1136                options,
1137                ..
1138            }) => {
1139                self.assert_no_expressions(expr)?;
1140                let input = self.only_input(inputs)?;
1141                // Update schema with unnested column type.
1142                let new_plan =
1143                    unnest_with_options(input, columns.clone(), options.clone())?;
1144                Ok(new_plan)
1145            }
1146        }
1147    }
1148
1149    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1150    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1151        match check {
1152            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1153            InvariantLevel::Executable => assert_executable_invariants(self),
1154        }
1155    }
1156
1157    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1158    #[inline]
1159    #[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1160    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1161        if !expr.is_empty() {
1162            return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1163        }
1164        Ok(())
1165    }
1166
1167    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1168    #[inline]
1169    #[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1170    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1171        if !inputs.is_empty() {
1172            return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1173        }
1174        Ok(())
1175    }
1176
1177    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1178    #[inline]
1179    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1180        if expr.len() != 1 {
1181            return internal_err!(
1182                "{self:?} should have exactly one expr, got {:?}",
1183                expr
1184            );
1185        }
1186        Ok(expr.remove(0))
1187    }
1188
1189    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1190    #[inline]
1191    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1192        if inputs.len() != 1 {
1193            return internal_err!(
1194                "{self:?} should have exactly one input, got {:?}",
1195                inputs
1196            );
1197        }
1198        Ok(inputs.remove(0))
1199    }
1200
1201    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1202    #[inline]
1203    fn only_two_inputs(
1204        &self,
1205        mut inputs: Vec<LogicalPlan>,
1206    ) -> Result<(LogicalPlan, LogicalPlan)> {
1207        if inputs.len() != 2 {
1208            return internal_err!(
1209                "{self:?} should have exactly two inputs, got {:?}",
1210                inputs
1211            );
1212        }
1213        let right = inputs.remove(1);
1214        let left = inputs.remove(0);
1215        Ok((left, right))
1216    }
1217
1218    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1219    /// with the specified `param_values`.
1220    ///
1221    /// [`Prepare`] statements are converted to
1222    /// their inner logical plan for execution.
1223    ///
1224    /// # Example
1225    /// ```
1226    /// # use arrow::datatypes::{Field, Schema, DataType};
1227    /// use datafusion_common::ScalarValue;
1228    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1229    /// # let schema = Schema::new(vec![
1230    /// #     Field::new("id", DataType::Int32, false),
1231    /// # ]);
1232    /// // Build SELECT * FROM t1 WHERE id = $1
1233    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1234    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1235    ///     .build().unwrap();
1236    ///
1237    /// assert_eq!(
1238    ///   "Filter: t1.id = $1\
1239    ///   \n  TableScan: t1",
1240    ///   plan.display_indent().to_string()
1241    /// );
1242    ///
1243    /// // Fill in the parameter $1 with a literal 3
1244    /// let plan = plan.with_param_values(vec![
1245    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1246    /// ]).unwrap();
1247    ///
1248    /// assert_eq!(
1249    ///    "Filter: t1.id = Int32(3)\
1250    ///    \n  TableScan: t1",
1251    ///    plan.display_indent().to_string()
1252    ///  );
1253    ///
1254    /// // Note you can also used named parameters
1255    /// // Build SELECT * FROM t1 WHERE id = $my_param
1256    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1257    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1258    ///     .build().unwrap()
1259    ///     // Fill in the parameter $my_param with a literal 3
1260    ///     .with_param_values(vec![
1261    ///       ("my_param", ScalarValue::from(3i32)),
1262    ///     ]).unwrap();
1263    ///
1264    /// assert_eq!(
1265    ///    "Filter: t1.id = Int32(3)\
1266    ///    \n  TableScan: t1",
1267    ///    plan.display_indent().to_string()
1268    ///  );
1269    /// ```
1270    pub fn with_param_values(
1271        self,
1272        param_values: impl Into<ParamValues>,
1273    ) -> Result<LogicalPlan> {
1274        let param_values = param_values.into();
1275        let plan_with_values = self.replace_params_with_values(&param_values)?;
1276
1277        // unwrap Prepare
1278        Ok(
1279            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1280                plan_with_values
1281            {
1282                param_values.verify_fields(&prepare_lp.fields)?;
1283                // try and take ownership of the input if is not shared, clone otherwise
1284                Arc::unwrap_or_clone(prepare_lp.input)
1285            } else {
1286                plan_with_values
1287            },
1288        )
1289    }
1290
1291    /// Returns the maximum number of rows that this plan can output, if known.
1292    ///
1293    /// If `None`, the plan can return any number of rows.
1294    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1295    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1296        match self {
1297            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1298            LogicalPlan::Filter(filter) => {
1299                if filter.is_scalar() {
1300                    Some(1)
1301                } else {
1302                    filter.input.max_rows()
1303                }
1304            }
1305            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1306            LogicalPlan::Aggregate(Aggregate {
1307                input, group_expr, ..
1308            }) => {
1309                // Empty group_expr will return Some(1)
1310                if group_expr
1311                    .iter()
1312                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1313                {
1314                    Some(1)
1315                } else {
1316                    input.max_rows()
1317                }
1318            }
1319            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1320                match (fetch, input.max_rows()) {
1321                    (Some(fetch_limit), Some(input_max)) => {
1322                        Some(input_max.min(*fetch_limit))
1323                    }
1324                    (Some(fetch_limit), None) => Some(*fetch_limit),
1325                    (None, Some(input_max)) => Some(input_max),
1326                    (None, None) => None,
1327                }
1328            }
1329            LogicalPlan::Join(Join {
1330                left,
1331                right,
1332                join_type,
1333                ..
1334            }) => match join_type {
1335                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1336                JoinType::Left | JoinType::Right | JoinType::Full => {
1337                    match (left.max_rows()?, right.max_rows()?, join_type) {
1338                        (0, 0, _) => Some(0),
1339                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1340                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1341                        (left_max, right_max, _) => Some(left_max * right_max),
1342                    }
1343                }
1344                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1345                    left.max_rows()
1346                }
1347                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1348                    right.max_rows()
1349                }
1350            },
1351            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1352            LogicalPlan::Union(Union { inputs, .. }) => {
1353                inputs.iter().try_fold(0usize, |mut acc, plan| {
1354                    acc += plan.max_rows()?;
1355                    Some(acc)
1356                })
1357            }
1358            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1359            LogicalPlan::EmptyRelation(_) => Some(0),
1360            LogicalPlan::RecursiveQuery(_) => None,
1361            LogicalPlan::Subquery(_) => None,
1362            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1363            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1364                Ok(FetchType::Literal(s)) => s,
1365                _ => None,
1366            },
1367            LogicalPlan::Distinct(
1368                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1369            ) => input.max_rows(),
1370            LogicalPlan::Values(v) => Some(v.values.len()),
1371            LogicalPlan::Unnest(_) => None,
1372            LogicalPlan::Ddl(_)
1373            | LogicalPlan::Explain(_)
1374            | LogicalPlan::Analyze(_)
1375            | LogicalPlan::Dml(_)
1376            | LogicalPlan::Copy(_)
1377            | LogicalPlan::DescribeTable(_)
1378            | LogicalPlan::Statement(_)
1379            | LogicalPlan::Extension(_) => None,
1380        }
1381    }
1382
1383    /// If this node's expressions contains any references to an outer subquery
1384    pub fn contains_outer_reference(&self) -> bool {
1385        let mut contains = false;
1386        self.apply_expressions(|expr| {
1387            Ok(if expr.contains_outer() {
1388                contains = true;
1389                TreeNodeRecursion::Stop
1390            } else {
1391                TreeNodeRecursion::Continue
1392            })
1393        })
1394        .unwrap();
1395        contains
1396    }
1397
1398    /// Get the output expressions and their corresponding columns.
1399    ///
1400    /// The parent node may reference the output columns of the plan by expressions, such as
1401    /// projection over aggregate or window functions. This method helps to convert the
1402    /// referenced expressions into columns.
1403    ///
1404    /// See also: [`crate::utils::columnize_expr`]
1405    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1406        match self {
1407            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1408                .output_expressions()?
1409                .into_iter()
1410                .zip(self.schema().columns())
1411                .collect()),
1412            LogicalPlan::Window(Window {
1413                window_expr,
1414                input,
1415                schema,
1416            }) => {
1417                // The input could be another Window, so the result should also include the input's. For Example:
1418                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1419                // Its plan is:
1420                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1421                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1422                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1423                //       TableScan: t projection=[a, b]
1424                let mut output_exprs = input.columnized_output_exprs()?;
1425                let input_len = input.schema().fields().len();
1426                output_exprs.extend(
1427                    window_expr
1428                        .iter()
1429                        .zip(schema.columns().into_iter().skip(input_len)),
1430                );
1431                Ok(output_exprs)
1432            }
1433            _ => Ok(vec![]),
1434        }
1435    }
1436}
1437
1438impl LogicalPlan {
1439    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1440    /// ...) replaced with corresponding values provided in
1441    /// `params_values`
1442    ///
1443    /// See [`Self::with_param_values`] for examples and usage with an owned
1444    /// `ParamValues`
1445    pub fn replace_params_with_values(
1446        self,
1447        param_values: &ParamValues,
1448    ) -> Result<LogicalPlan> {
1449        self.transform_up_with_subqueries(|plan| {
1450            let schema = Arc::clone(plan.schema());
1451            let name_preserver = NamePreserver::new(&plan);
1452            plan.map_expressions(|e| {
1453                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1454                if !has_placeholder {
1455                    // Performance optimization:
1456                    // avoid NamePreserver copy and second pass over expression
1457                    // if no placeholders.
1458                    Ok(Transformed::no(e))
1459                } else {
1460                    let original_name = name_preserver.save(&e);
1461                    let transformed_expr = e.transform_up(|e| {
1462                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1463                            let (value, metadata) = param_values
1464                                .get_placeholders_with_values(&id)?
1465                                .into_inner();
1466                            Ok(Transformed::yes(Expr::Literal(value, metadata)))
1467                        } else {
1468                            Ok(Transformed::no(e))
1469                        }
1470                    })?;
1471                    // Preserve name to avoid breaking column references to this expression
1472                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1473                }
1474            })
1475        })
1476        .map(|res| res.data)
1477    }
1478
1479    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1480    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1481        let mut param_names = HashSet::new();
1482        self.apply_with_subqueries(|plan| {
1483            plan.apply_expressions(|expr| {
1484                expr.apply(|expr| {
1485                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1486                        param_names.insert(id.clone());
1487                    }
1488                    Ok(TreeNodeRecursion::Continue)
1489                })
1490            })
1491        })
1492        .map(|_| param_names)
1493    }
1494
1495    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1496    ///
1497    /// Note that this will drop any extension or field metadata attached to parameters. Use
1498    /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata.
1499    pub fn get_parameter_types(
1500        &self,
1501    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1502        let mut parameter_fields = self.get_parameter_fields()?;
1503        Ok(parameter_fields
1504            .drain()
1505            .map(|(name, maybe_field)| {
1506                (name, maybe_field.map(|field| field.data_type().clone()))
1507            })
1508            .collect())
1509    }
1510
1511    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs
1512    pub fn get_parameter_fields(
1513        &self,
1514    ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1515        let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1516
1517        self.apply_with_subqueries(|plan| {
1518            plan.apply_expressions(|expr| {
1519                expr.apply(|expr| {
1520                    if let Expr::Placeholder(Placeholder { id, field }) = expr {
1521                        let prev = param_types.get(id);
1522                        match (prev, field) {
1523                            (Some(Some(prev)), Some(field)) => {
1524                                check_metadata_with_storage_equal(
1525                                    (field.data_type(), Some(field.metadata())),
1526                                    (prev.data_type(), Some(prev.metadata())),
1527                                    "parameter",
1528                                    &format!(": Conflicting types for id {id}"),
1529                                )?;
1530                            }
1531                            (_, Some(field)) => {
1532                                param_types.insert(id.clone(), Some(Arc::clone(field)));
1533                            }
1534                            _ => {
1535                                param_types.insert(id.clone(), None);
1536                            }
1537                        }
1538                    }
1539                    Ok(TreeNodeRecursion::Continue)
1540                })
1541            })
1542        })
1543        .map(|_| param_types)
1544    }
1545
1546    // ------------
1547    // Various implementations for printing out LogicalPlans
1548    // ------------
1549
1550    /// Return a `format`able structure that produces a single line
1551    /// per node.
1552    ///
1553    /// # Example
1554    ///
1555    /// ```text
1556    /// Projection: employee.id
1557    ///    Filter: employee.state Eq Utf8(\"CO\")\
1558    ///       CsvScan: employee projection=Some([0, 3])
1559    /// ```
1560    ///
1561    /// ```
1562    /// use arrow::datatypes::{DataType, Field, Schema};
1563    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1564    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1565    /// let plan = table_scan(Some("t1"), &schema, None)
1566    ///     .unwrap()
1567    ///     .filter(col("id").eq(lit(5)))
1568    ///     .unwrap()
1569    ///     .build()
1570    ///     .unwrap();
1571    ///
1572    /// // Format using display_indent
1573    /// let display_string = format!("{}", plan.display_indent());
1574    ///
1575    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1", display_string);
1576    /// ```
1577    pub fn display_indent(&self) -> impl Display + '_ {
1578        // Boilerplate structure to wrap LogicalPlan with something
1579        // that that can be formatted
1580        struct Wrapper<'a>(&'a LogicalPlan);
1581        impl Display for Wrapper<'_> {
1582            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1583                let with_schema = false;
1584                let mut visitor = IndentVisitor::new(f, with_schema);
1585                match self.0.visit_with_subqueries(&mut visitor) {
1586                    Ok(_) => Ok(()),
1587                    Err(_) => Err(fmt::Error),
1588                }
1589            }
1590        }
1591        Wrapper(self)
1592    }
1593
1594    /// Return a `format`able structure that produces a single line
1595    /// per node that includes the output schema. For example:
1596    ///
1597    /// ```text
1598    /// Projection: employee.id [id:Int32]\
1599    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1600    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1601    /// ```
1602    ///
1603    /// ```
1604    /// use arrow::datatypes::{DataType, Field, Schema};
1605    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1606    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1607    /// let plan = table_scan(Some("t1"), &schema, None)
1608    ///     .unwrap()
1609    ///     .filter(col("id").eq(lit(5)))
1610    ///     .unwrap()
1611    ///     .build()
1612    ///     .unwrap();
1613    ///
1614    /// // Format using display_indent_schema
1615    /// let display_string = format!("{}", plan.display_indent_schema());
1616    ///
1617    /// assert_eq!(
1618    ///     "Filter: t1.id = Int32(5) [id:Int32]\
1619    ///             \n  TableScan: t1 [id:Int32]",
1620    ///     display_string
1621    /// );
1622    /// ```
1623    pub fn display_indent_schema(&self) -> impl Display + '_ {
1624        // Boilerplate structure to wrap LogicalPlan with something
1625        // that that can be formatted
1626        struct Wrapper<'a>(&'a LogicalPlan);
1627        impl Display for Wrapper<'_> {
1628            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1629                let with_schema = true;
1630                let mut visitor = IndentVisitor::new(f, with_schema);
1631                match self.0.visit_with_subqueries(&mut visitor) {
1632                    Ok(_) => Ok(()),
1633                    Err(_) => Err(fmt::Error),
1634                }
1635            }
1636        }
1637        Wrapper(self)
1638    }
1639
1640    /// Return a displayable structure that produces plan in postgresql JSON format.
1641    ///
1642    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1643    pub fn display_pg_json(&self) -> impl Display + '_ {
1644        // Boilerplate structure to wrap LogicalPlan with something
1645        // that that can be formatted
1646        struct Wrapper<'a>(&'a LogicalPlan);
1647        impl Display for Wrapper<'_> {
1648            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1649                let mut visitor = PgJsonVisitor::new(f);
1650                visitor.with_schema(true);
1651                match self.0.visit_with_subqueries(&mut visitor) {
1652                    Ok(_) => Ok(()),
1653                    Err(_) => Err(fmt::Error),
1654                }
1655            }
1656        }
1657        Wrapper(self)
1658    }
1659
1660    /// Return a `format`able structure that produces lines meant for
1661    /// graphical display using the `DOT` language. This format can be
1662    /// visualized using software from
1663    /// [`graphviz`](https://graphviz.org/)
1664    ///
1665    /// This currently produces two graphs -- one with the basic
1666    /// structure, and one with additional details such as schema.
1667    ///
1668    /// ```
1669    /// use arrow::datatypes::{DataType, Field, Schema};
1670    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1671    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1672    /// let plan = table_scan(Some("t1"), &schema, None)
1673    ///     .unwrap()
1674    ///     .filter(col("id").eq(lit(5)))
1675    ///     .unwrap()
1676    ///     .build()
1677    ///     .unwrap();
1678    ///
1679    /// // Format using display_graphviz
1680    /// let graphviz_string = format!("{}", plan.display_graphviz());
1681    /// ```
1682    ///
1683    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1684    /// commands can be used to render it as a pdf:
1685    ///
1686    /// ```bash
1687    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1688    /// ```
1689    pub fn display_graphviz(&self) -> impl Display + '_ {
1690        // Boilerplate structure to wrap LogicalPlan with something
1691        // that that can be formatted
1692        struct Wrapper<'a>(&'a LogicalPlan);
1693        impl Display for Wrapper<'_> {
1694            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1695                let mut visitor = GraphvizVisitor::new(f);
1696
1697                visitor.start_graph()?;
1698
1699                visitor.pre_visit_plan("LogicalPlan")?;
1700                self.0
1701                    .visit_with_subqueries(&mut visitor)
1702                    .map_err(|_| fmt::Error)?;
1703                visitor.post_visit_plan()?;
1704
1705                visitor.set_with_schema(true);
1706                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1707                self.0
1708                    .visit_with_subqueries(&mut visitor)
1709                    .map_err(|_| fmt::Error)?;
1710                visitor.post_visit_plan()?;
1711
1712                visitor.end_graph()?;
1713                Ok(())
1714            }
1715        }
1716        Wrapper(self)
1717    }
1718
1719    /// Return a `format`able structure with the a human readable
1720    /// description of this LogicalPlan node per node, not including
1721    /// children. For example:
1722    ///
1723    /// ```text
1724    /// Projection: id
1725    /// ```
1726    /// ```
1727    /// use arrow::datatypes::{DataType, Field, Schema};
1728    /// use datafusion_expr::{col, lit, logical_plan::table_scan, LogicalPlanBuilder};
1729    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1730    /// let plan = table_scan(Some("t1"), &schema, None)
1731    ///     .unwrap()
1732    ///     .build()
1733    ///     .unwrap();
1734    ///
1735    /// // Format using display
1736    /// let display_string = format!("{}", plan.display());
1737    ///
1738    /// assert_eq!("TableScan: t1", display_string);
1739    /// ```
1740    pub fn display(&self) -> impl Display + '_ {
1741        // Boilerplate structure to wrap LogicalPlan with something
1742        // that that can be formatted
1743        struct Wrapper<'a>(&'a LogicalPlan);
1744        impl Display for Wrapper<'_> {
1745            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1746                match self.0 {
1747                    LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: _ }) => {
1748                        let rows = if *produce_one_row { 1 } else { 0 };
1749                        write!(f, "EmptyRelation: rows={rows}")
1750                    },
1751                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1752                        is_distinct, ..
1753                    }) => {
1754                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1755                    }
1756                    LogicalPlan::Values(Values { ref values, .. }) => {
1757                        let str_values: Vec<_> = values
1758                            .iter()
1759                            // limit to only 5 values to avoid horrible display
1760                            .take(5)
1761                            .map(|row| {
1762                                let item = row
1763                                    .iter()
1764                                    .map(|expr| expr.to_string())
1765                                    .collect::<Vec<_>>()
1766                                    .join(", ");
1767                                format!("({item})")
1768                            })
1769                            .collect();
1770
1771                        let eclipse = if values.len() > 5 { "..." } else { "" };
1772                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1773                    }
1774
1775                    LogicalPlan::TableScan(TableScan {
1776                        ref source,
1777                        ref table_name,
1778                        ref projection,
1779                        ref filters,
1780                        ref fetch,
1781                        ..
1782                    }) => {
1783                        let projected_fields = match projection {
1784                            Some(indices) => {
1785                                let schema = source.schema();
1786                                let names: Vec<&str> = indices
1787                                    .iter()
1788                                    .map(|i| schema.field(*i).name().as_str())
1789                                    .collect();
1790                                format!(" projection=[{}]", names.join(", "))
1791                            }
1792                            _ => "".to_string(),
1793                        };
1794
1795                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1796
1797                        if !filters.is_empty() {
1798                            let mut full_filter = vec![];
1799                            let mut partial_filter = vec![];
1800                            let mut unsupported_filters = vec![];
1801                            let filters: Vec<&Expr> = filters.iter().collect();
1802
1803                            if let Ok(results) =
1804                                source.supports_filters_pushdown(&filters)
1805                            {
1806                                filters.iter().zip(results.iter()).for_each(
1807                                    |(x, res)| match res {
1808                                        TableProviderFilterPushDown::Exact => {
1809                                            full_filter.push(x)
1810                                        }
1811                                        TableProviderFilterPushDown::Inexact => {
1812                                            partial_filter.push(x)
1813                                        }
1814                                        TableProviderFilterPushDown::Unsupported => {
1815                                            unsupported_filters.push(x)
1816                                        }
1817                                    },
1818                                );
1819                            }
1820
1821                            if !full_filter.is_empty() {
1822                                write!(
1823                                    f,
1824                                    ", full_filters=[{}]",
1825                                    expr_vec_fmt!(full_filter)
1826                                )?;
1827                            };
1828                            if !partial_filter.is_empty() {
1829                                write!(
1830                                    f,
1831                                    ", partial_filters=[{}]",
1832                                    expr_vec_fmt!(partial_filter)
1833                                )?;
1834                            }
1835                            if !unsupported_filters.is_empty() {
1836                                write!(
1837                                    f,
1838                                    ", unsupported_filters=[{}]",
1839                                    expr_vec_fmt!(unsupported_filters)
1840                                )?;
1841                            }
1842                        }
1843
1844                        if let Some(n) = fetch {
1845                            write!(f, ", fetch={n}")?;
1846                        }
1847
1848                        Ok(())
1849                    }
1850                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
1851                        write!(f, "Projection:")?;
1852                        for (i, expr_item) in expr.iter().enumerate() {
1853                            if i > 0 {
1854                                write!(f, ",")?;
1855                            }
1856                            write!(f, " {expr_item}")?;
1857                        }
1858                        Ok(())
1859                    }
1860                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1861                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1862                    }
1863                    LogicalPlan::Copy(CopyTo {
1864                        input: _,
1865                        output_url,
1866                        file_type,
1867                        options,
1868                        ..
1869                    }) => {
1870                        let op_str = options
1871                            .iter()
1872                            .map(|(k, v)| format!("{k} {v}"))
1873                            .collect::<Vec<String>>()
1874                            .join(", ");
1875
1876                        write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1877                    }
1878                    LogicalPlan::Ddl(ddl) => {
1879                        write!(f, "{}", ddl.display())
1880                    }
1881                    LogicalPlan::Filter(Filter {
1882                        predicate: ref expr,
1883                        ..
1884                    }) => write!(f, "Filter: {expr}"),
1885                    LogicalPlan::Window(Window {
1886                        ref window_expr, ..
1887                    }) => {
1888                        write!(
1889                            f,
1890                            "WindowAggr: windowExpr=[[{}]]",
1891                            expr_vec_fmt!(window_expr)
1892                        )
1893                    }
1894                    LogicalPlan::Aggregate(Aggregate {
1895                        ref group_expr,
1896                        ref aggr_expr,
1897                        ..
1898                    }) => write!(
1899                        f,
1900                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1901                        expr_vec_fmt!(group_expr),
1902                        expr_vec_fmt!(aggr_expr)
1903                    ),
1904                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1905                        write!(f, "Sort: ")?;
1906                        for (i, expr_item) in expr.iter().enumerate() {
1907                            if i > 0 {
1908                                write!(f, ", ")?;
1909                            }
1910                            write!(f, "{expr_item}")?;
1911                        }
1912                        if let Some(a) = fetch {
1913                            write!(f, ", fetch={a}")?;
1914                        }
1915
1916                        Ok(())
1917                    }
1918                    LogicalPlan::Join(Join {
1919                        on: ref keys,
1920                        filter,
1921                        join_constraint,
1922                        join_type,
1923                        ..
1924                    }) => {
1925                        let join_expr: Vec<String> =
1926                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1927                        let filter_expr = filter
1928                            .as_ref()
1929                            .map(|expr| format!(" Filter: {expr}"))
1930                            .unwrap_or_else(|| "".to_string());
1931                        let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1932                            "Cross".to_string()
1933                        } else {
1934                            join_type.to_string()
1935                        };
1936                        match join_constraint {
1937                            JoinConstraint::On => {
1938                                write!(
1939                                    f,
1940                                    "{} Join: {}{}",
1941                                    join_type,
1942                                    join_expr.join(", "),
1943                                    filter_expr
1944                                )
1945                            }
1946                            JoinConstraint::Using => {
1947                                write!(
1948                                    f,
1949                                    "{} Join: Using {}{}",
1950                                    join_type,
1951                                    join_expr.join(", "),
1952                                    filter_expr,
1953                                )
1954                            }
1955                        }
1956                    }
1957                    LogicalPlan::Repartition(Repartition {
1958                        partitioning_scheme,
1959                        ..
1960                    }) => match partitioning_scheme {
1961                        Partitioning::RoundRobinBatch(n) => {
1962                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1963                        }
1964                        Partitioning::Hash(expr, n) => {
1965                            let hash_expr: Vec<String> =
1966                                expr.iter().map(|e| format!("{e}")).collect();
1967                            write!(
1968                                f,
1969                                "Repartition: Hash({}) partition_count={}",
1970                                hash_expr.join(", "),
1971                                n
1972                            )
1973                        }
1974                        Partitioning::DistributeBy(expr) => {
1975                            let dist_by_expr: Vec<String> =
1976                                expr.iter().map(|e| format!("{e}")).collect();
1977                            write!(
1978                                f,
1979                                "Repartition: DistributeBy({})",
1980                                dist_by_expr.join(", "),
1981                            )
1982                        }
1983                    },
1984                    LogicalPlan::Limit(limit) => {
1985                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
1986                        let skip_str = match limit.get_skip_type() {
1987                            Ok(SkipType::Literal(n)) => n.to_string(),
1988                            _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1989                        };
1990                        let fetch_str = match limit.get_fetch_type() {
1991                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
1992                            Ok(FetchType::Literal(None)) => "None".to_string(),
1993                            _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1994                        };
1995                        write!(
1996                            f,
1997                            "Limit: skip={skip_str}, fetch={fetch_str}",
1998                        )
1999                    }
2000                    LogicalPlan::Subquery(Subquery { .. }) => {
2001                        write!(f, "Subquery:")
2002                    }
2003                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
2004                        write!(f, "SubqueryAlias: {alias}")
2005                    }
2006                    LogicalPlan::Statement(statement) => {
2007                        write!(f, "{}", statement.display())
2008                    }
2009                    LogicalPlan::Distinct(distinct) => match distinct {
2010                        Distinct::All(_) => write!(f, "Distinct:"),
2011                        Distinct::On(DistinctOn {
2012                            on_expr,
2013                            select_expr,
2014                            sort_expr,
2015                            ..
2016                        }) => write!(
2017                            f,
2018                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2019                            expr_vec_fmt!(on_expr),
2020                            expr_vec_fmt!(select_expr),
2021                            if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
2022                        ),
2023                    },
2024                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
2025                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2026                    LogicalPlan::Union(_) => write!(f, "Union"),
2027                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2028                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2029                        write!(f, "DescribeTable")
2030                    }
2031                    LogicalPlan::Unnest(Unnest {
2032                        input: plan,
2033                        list_type_columns: list_col_indices,
2034                        struct_type_columns: struct_col_indices, .. }) => {
2035                        let input_columns = plan.schema().columns();
2036                        let list_type_columns = list_col_indices
2037                            .iter()
2038                            .map(|(i,unnest_info)|
2039                                format!("{}|depth={}", &input_columns[*i].to_string(),
2040                                unnest_info.depth))
2041                            .collect::<Vec<String>>();
2042                        let struct_type_columns = struct_col_indices
2043                            .iter()
2044                            .map(|i| &input_columns[*i])
2045                            .collect::<Vec<&Column>>();
2046                        // get items from input_columns indexed by list_col_indices
2047                        write!(f, "Unnest: lists[{}] structs[{}]",
2048                        expr_vec_fmt!(list_type_columns),
2049                        expr_vec_fmt!(struct_type_columns))
2050                    }
2051                }
2052            }
2053        }
2054        Wrapper(self)
2055    }
2056}
2057
2058impl Display for LogicalPlan {
2059    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2060        self.display_indent().fmt(f)
2061    }
2062}
2063
2064impl ToStringifiedPlan for LogicalPlan {
2065    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2066        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2067    }
2068}
2069
2070/// Relationship produces 0 or 1 placeholder rows with specified output schema
2071/// In most cases the output schema for `EmptyRelation` would be empty,
2072/// however, it can be non-empty typically for optimizer rules
2073#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2074pub struct EmptyRelation {
2075    /// Whether to produce a placeholder row
2076    pub produce_one_row: bool,
2077    /// The schema description of the output
2078    pub schema: DFSchemaRef,
2079}
2080
2081// Manual implementation needed because of `schema` field. Comparison excludes this field.
2082impl PartialOrd for EmptyRelation {
2083    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2084        self.produce_one_row
2085            .partial_cmp(&other.produce_one_row)
2086            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2087            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2088    }
2089}
2090
2091/// A variadic query operation, Recursive CTE.
2092///
2093/// # Recursive Query Evaluation
2094///
2095/// From the [Postgres Docs]:
2096///
2097/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2098///    discard duplicate rows. Include all remaining rows in the result of the
2099///    recursive query, and also place them in a temporary working table.
2100///
2101/// 2. So long as the working table is not empty, repeat these steps:
2102///
2103/// * Evaluate the recursive term, substituting the current contents of the
2104///   working table for the recursive self-reference. For `UNION` (but not `UNION
2105///   ALL`), discard duplicate rows and rows that duplicate any previous result
2106///   row. Include all remaining rows in the result of the recursive query, and
2107///   also place them in a temporary intermediate table.
2108///
2109/// * Replace the contents of the working table with the contents of the
2110///   intermediate table, then empty the intermediate table.
2111///
2112/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2113#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2114pub struct RecursiveQuery {
2115    /// Name of the query
2116    pub name: String,
2117    /// The static term (initial contents of the working table)
2118    pub static_term: Arc<LogicalPlan>,
2119    /// The recursive term (evaluated on the contents of the working table until
2120    /// it returns an empty set)
2121    pub recursive_term: Arc<LogicalPlan>,
2122    /// Should the output of the recursive term be deduplicated (`UNION`) or
2123    /// not (`UNION ALL`).
2124    pub is_distinct: bool,
2125}
2126
2127/// Values expression. See
2128/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2129/// documentation for more details.
2130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2131pub struct Values {
2132    /// The table schema
2133    pub schema: DFSchemaRef,
2134    /// Values
2135    pub values: Vec<Vec<Expr>>,
2136}
2137
2138// Manual implementation needed because of `schema` field. Comparison excludes this field.
2139impl PartialOrd for Values {
2140    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141        self.values
2142            .partial_cmp(&other.values)
2143            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2144            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2145    }
2146}
2147
2148/// Evaluates an arbitrary list of expressions (essentially a
2149/// SELECT with an expression list) on its input.
2150#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2151// mark non_exhaustive to encourage use of try_new/new()
2152#[non_exhaustive]
2153pub struct Projection {
2154    /// The list of expressions
2155    pub expr: Vec<Expr>,
2156    /// The incoming logical plan
2157    pub input: Arc<LogicalPlan>,
2158    /// The schema description of the output
2159    pub schema: DFSchemaRef,
2160}
2161
2162// Manual implementation needed because of `schema` field. Comparison excludes this field.
2163impl PartialOrd for Projection {
2164    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2165        match self.expr.partial_cmp(&other.expr) {
2166            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2167            cmp => cmp,
2168        }
2169        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2170        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2171    }
2172}
2173
2174impl Projection {
2175    /// Create a new Projection
2176    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2177        let projection_schema = projection_schema(&input, &expr)?;
2178        Self::try_new_with_schema(expr, input, projection_schema)
2179    }
2180
2181    /// Create a new Projection using the specified output schema
2182    pub fn try_new_with_schema(
2183        expr: Vec<Expr>,
2184        input: Arc<LogicalPlan>,
2185        schema: DFSchemaRef,
2186    ) -> Result<Self> {
2187        #[expect(deprecated)]
2188        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2189            && expr.len() != schema.fields().len()
2190        {
2191            return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2192        }
2193        Ok(Self {
2194            expr,
2195            input,
2196            schema,
2197        })
2198    }
2199
2200    /// Create a new Projection using the specified output schema
2201    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2202        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2203        Self {
2204            expr,
2205            input,
2206            schema,
2207        }
2208    }
2209}
2210
2211/// Computes the schema of the result produced by applying a projection to the input logical plan.
2212///
2213/// # Arguments
2214///
2215/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2216///   will be computed.
2217/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2218///
2219/// # Metadata Handling
2220///
2221/// - **Schema-level metadata**: Passed through unchanged from the input schema
2222/// - **Field-level metadata**: Determined by each expression via [`exprlist_to_fields`], which
2223///   calls [`Expr::to_field`] to handle expression-specific metadata (literals, aliases, etc.)
2224///
2225/// # Returns
2226///
2227/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2228/// produced by the projection operation. If the schema computation is successful,
2229/// the `Result` will contain the schema; otherwise, it will contain an error.
2230pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2231    // Preserve input schema metadata at the schema level
2232    let metadata = input.schema().metadata().clone();
2233
2234    // Convert expressions to fields with Field properties determined by `Expr::to_field`
2235    let schema =
2236        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2237            .with_functional_dependencies(calc_func_dependencies_for_project(
2238                exprs, input,
2239            )?)?;
2240
2241    Ok(Arc::new(schema))
2242}
2243
2244/// Aliased subquery
2245#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2246// mark non_exhaustive to encourage use of try_new/new()
2247#[non_exhaustive]
2248pub struct SubqueryAlias {
2249    /// The incoming logical plan
2250    pub input: Arc<LogicalPlan>,
2251    /// The alias for the input relation
2252    pub alias: TableReference,
2253    /// The schema with qualified field names
2254    pub schema: DFSchemaRef,
2255}
2256
2257impl SubqueryAlias {
2258    pub fn try_new(
2259        plan: Arc<LogicalPlan>,
2260        alias: impl Into<TableReference>,
2261    ) -> Result<Self> {
2262        let alias = alias.into();
2263
2264        // Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2265        // no field must share the same column name as this would lead to ambiguity when referencing
2266        // columns in parent logical nodes.
2267
2268        // Compute unique aliases, if any, for each column of the input's schema.
2269        let aliases = unique_field_aliases(plan.schema().fields());
2270        let is_projection_needed = aliases.iter().any(Option::is_some);
2271
2272        // Insert a projection node, if needed, to make sure aliases are applied.
2273        let plan = if is_projection_needed {
2274            let projection_expressions = aliases
2275                .iter()
2276                .zip(plan.schema().iter())
2277                .map(|(alias, (qualifier, field))| {
2278                    let column =
2279                        Expr::Column(Column::new(qualifier.cloned(), field.name()));
2280                    match alias {
2281                        None => column,
2282                        Some(alias) => {
2283                            Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2284                        }
2285                    }
2286                })
2287                .collect();
2288            let projection = Projection::try_new(projection_expressions, plan)?;
2289            Arc::new(LogicalPlan::Projection(projection))
2290        } else {
2291            plan
2292        };
2293
2294        // Requalify fields with the new `alias`.
2295        let fields = plan.schema().fields().clone();
2296        let meta_data = plan.schema().metadata().clone();
2297        let func_dependencies = plan.schema().functional_dependencies().clone();
2298
2299        let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2300        let schema = schema.as_arrow();
2301
2302        let schema = DFSchemaRef::new(
2303            DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2304                .with_functional_dependencies(func_dependencies)?,
2305        );
2306        Ok(SubqueryAlias {
2307            input: plan,
2308            alias,
2309            schema,
2310        })
2311    }
2312}
2313
2314// Manual implementation needed because of `schema` field. Comparison excludes this field.
2315impl PartialOrd for SubqueryAlias {
2316    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2317        match self.input.partial_cmp(&other.input) {
2318            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2319            cmp => cmp,
2320        }
2321        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2322        .filter(|cmp| *cmp != Ordering::Equal || self == other)
2323    }
2324}
2325
2326/// Filters rows from its input that do not match an
2327/// expression (essentially a WHERE clause with a predicate
2328/// expression).
2329///
2330/// Semantically, `<predicate>` is evaluated for each row of the input;
2331/// If the value of `<predicate>` is true, the input row is passed to
2332/// the output. If the value of `<predicate>` is false, the row is
2333/// discarded.
2334///
2335/// Filter should not be created directly but instead use `try_new()`
2336/// and that these fields are only pub to support pattern matching
2337#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2338#[non_exhaustive]
2339pub struct Filter {
2340    /// The predicate expression, which must have Boolean type.
2341    pub predicate: Expr,
2342    /// The incoming logical plan
2343    pub input: Arc<LogicalPlan>,
2344}
2345
2346impl Filter {
2347    /// Create a new filter operator.
2348    ///
2349    /// Notes: as Aliases have no effect on the output of a filter operator,
2350    /// they are removed from the predicate expression.
2351    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2352        Self::try_new_internal(predicate, input)
2353    }
2354
2355    /// Create a new filter operator for a having clause.
2356    /// This is similar to a filter, but its having flag is set to true.
2357    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2358    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2359        Self::try_new_internal(predicate, input)
2360    }
2361
2362    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2363        match data_type {
2364            // Interpret NULL as a missing boolean value.
2365            DataType::Boolean | DataType::Null => true,
2366            DataType::Dictionary(_, value_type) => {
2367                Filter::is_allowed_filter_type(value_type.as_ref())
2368            }
2369            _ => false,
2370        }
2371    }
2372
2373    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2374        // Filter predicates must return a boolean value so we try and validate that here.
2375        // Note that it is not always possible to resolve the predicate expression during plan
2376        // construction (such as with correlated subqueries) so we make a best effort here and
2377        // ignore errors resolving the expression against the schema.
2378        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2379            if !Filter::is_allowed_filter_type(&predicate_type) {
2380                return plan_err!(
2381                    "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2382                );
2383            }
2384        }
2385
2386        Ok(Self {
2387            predicate: predicate.unalias_nested().data,
2388            input,
2389        })
2390    }
2391
2392    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2393    ///
2394    /// This function will return `true` if its predicate contains a conjunction of
2395    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2396    /// by this conjunction.
2397    ///
2398    /// For example, for the table:
2399    /// ```sql
2400    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2401    /// ```
2402    /// `Filter(a = 2).is_scalar() == true`
2403    /// , whereas
2404    /// `Filter(b = 2).is_scalar() == false`
2405    /// and
2406    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2407    fn is_scalar(&self) -> bool {
2408        let schema = self.input.schema();
2409
2410        let functional_dependencies = self.input.schema().functional_dependencies();
2411        let unique_keys = functional_dependencies.iter().filter(|dep| {
2412            let nullable = dep.nullable
2413                && dep
2414                    .source_indices
2415                    .iter()
2416                    .any(|&source| schema.field(source).is_nullable());
2417            !nullable
2418                && dep.mode == Dependency::Single
2419                && dep.target_indices.len() == schema.fields().len()
2420        });
2421
2422        let exprs = split_conjunction(&self.predicate);
2423        let eq_pred_cols: HashSet<_> = exprs
2424            .iter()
2425            .filter_map(|expr| {
2426                let Expr::BinaryExpr(BinaryExpr {
2427                    left,
2428                    op: Operator::Eq,
2429                    right,
2430                }) = expr
2431                else {
2432                    return None;
2433                };
2434                // This is a no-op filter expression
2435                if left == right {
2436                    return None;
2437                }
2438
2439                match (left.as_ref(), right.as_ref()) {
2440                    (Expr::Column(_), Expr::Column(_)) => None,
2441                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2442                        Some(schema.index_of_column(c).unwrap())
2443                    }
2444                    _ => None,
2445                }
2446            })
2447            .collect();
2448
2449        // If we have a functional dependence that is a subset of our predicate,
2450        // this filter is scalar
2451        for key in unique_keys {
2452            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2453                return true;
2454            }
2455        }
2456        false
2457    }
2458}
2459
2460/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2461///
2462/// # Output Schema
2463///
2464/// The output schema is the input schema followed by the window function
2465/// expressions, in order.
2466///
2467/// For example, given the input schema `"A", "B", "C"` and the window function
2468/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2469/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2470/// output column.
2471///
2472/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2473/// schema.
2474#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2475pub struct Window {
2476    /// The incoming logical plan
2477    pub input: Arc<LogicalPlan>,
2478    /// The window function expression
2479    pub window_expr: Vec<Expr>,
2480    /// The schema description of the window output
2481    pub schema: DFSchemaRef,
2482}
2483
2484impl Window {
2485    /// Create a new window operator.
2486    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2487        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2488            .schema()
2489            .iter()
2490            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2491            .collect();
2492        let input_len = fields.len();
2493        let mut window_fields = fields;
2494        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2495        window_fields.extend_from_slice(expr_fields.as_slice());
2496        let metadata = input.schema().metadata().clone();
2497
2498        // Update functional dependencies for window:
2499        let mut window_func_dependencies =
2500            input.schema().functional_dependencies().clone();
2501        window_func_dependencies.extend_target_indices(window_fields.len());
2502
2503        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2504        // of consecutive numbers per partition), we can represent this fact with
2505        // functional dependencies.
2506        let mut new_dependencies = window_expr
2507            .iter()
2508            .enumerate()
2509            .filter_map(|(idx, expr)| {
2510                let Expr::WindowFunction(window_fun) = expr else {
2511                    return None;
2512                };
2513                let WindowFunction {
2514                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2515                    params: WindowFunctionParams { partition_by, .. },
2516                } = window_fun.as_ref()
2517                else {
2518                    return None;
2519                };
2520                // When there is no PARTITION BY, row number will be unique
2521                // across the entire table.
2522                if udwf.name() == "row_number" && partition_by.is_empty() {
2523                    Some(idx + input_len)
2524                } else {
2525                    None
2526                }
2527            })
2528            .map(|idx| {
2529                FunctionalDependence::new(vec![idx], vec![], false)
2530                    .with_mode(Dependency::Single)
2531            })
2532            .collect::<Vec<_>>();
2533
2534        if !new_dependencies.is_empty() {
2535            for dependence in new_dependencies.iter_mut() {
2536                dependence.target_indices = (0..window_fields.len()).collect();
2537            }
2538            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2539            let new_deps = FunctionalDependencies::new(new_dependencies);
2540            window_func_dependencies.extend(new_deps);
2541        }
2542
2543        // Validate that FILTER clauses are only used with aggregate window functions
2544        if let Some(e) = window_expr.iter().find(|e| {
2545            matches!(
2546                e,
2547                Expr::WindowFunction(wf)
2548                    if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2549                        && wf.params.filter.is_some()
2550            )
2551        }) {
2552            return plan_err!(
2553                "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2554            );
2555        }
2556
2557        Self::try_new_with_schema(
2558            window_expr,
2559            input,
2560            Arc::new(
2561                DFSchema::new_with_metadata(window_fields, metadata)?
2562                    .with_functional_dependencies(window_func_dependencies)?,
2563            ),
2564        )
2565    }
2566
2567    /// Create a new window function using the provided schema to avoid the overhead of
2568    /// building the schema again when the schema is already known.
2569    ///
2570    /// This method should only be called when you are absolutely sure that the schema being
2571    /// provided is correct for the window function. If in doubt, call [try_new](Self::try_new) instead.
2572    pub fn try_new_with_schema(
2573        window_expr: Vec<Expr>,
2574        input: Arc<LogicalPlan>,
2575        schema: DFSchemaRef,
2576    ) -> Result<Self> {
2577        let input_fields_count = input.schema().fields().len();
2578        if schema.fields().len() != input_fields_count + window_expr.len() {
2579            return plan_err!(
2580                "Window schema has wrong number of fields. Expected {} got {}",
2581                input_fields_count + window_expr.len(),
2582                schema.fields().len()
2583            );
2584        }
2585
2586        Ok(Window {
2587            input,
2588            window_expr,
2589            schema,
2590        })
2591    }
2592}
2593
2594// Manual implementation needed because of `schema` field. Comparison excludes this field.
2595impl PartialOrd for Window {
2596    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2597        match self.input.partial_cmp(&other.input)? {
2598            Ordering::Equal => {} // continue
2599            not_equal => return Some(not_equal),
2600        }
2601
2602        match self.window_expr.partial_cmp(&other.window_expr)? {
2603            Ordering::Equal => {} // continue
2604            not_equal => return Some(not_equal),
2605        }
2606
2607        // Contract for PartialOrd and PartialEq consistency requires that
2608        // a == b if and only if partial_cmp(a, b) == Some(Equal).
2609        if self == other {
2610            Some(Ordering::Equal)
2611        } else {
2612            None
2613        }
2614    }
2615}
2616
2617/// Produces rows from a table provider by reference or from the context
2618#[derive(Clone)]
2619pub struct TableScan {
2620    /// The name of the table
2621    pub table_name: TableReference,
2622    /// The source of the table
2623    pub source: Arc<dyn TableSource>,
2624    /// Optional column indices to use as a projection
2625    pub projection: Option<Vec<usize>>,
2626    /// The schema description of the output
2627    pub projected_schema: DFSchemaRef,
2628    /// Optional expressions to be used as filters by the table provider
2629    pub filters: Vec<Expr>,
2630    /// Optional number of rows to read
2631    pub fetch: Option<usize>,
2632}
2633
2634impl Debug for TableScan {
2635    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2636        f.debug_struct("TableScan")
2637            .field("table_name", &self.table_name)
2638            .field("source", &"...")
2639            .field("projection", &self.projection)
2640            .field("projected_schema", &self.projected_schema)
2641            .field("filters", &self.filters)
2642            .field("fetch", &self.fetch)
2643            .finish_non_exhaustive()
2644    }
2645}
2646
2647impl PartialEq for TableScan {
2648    fn eq(&self, other: &Self) -> bool {
2649        self.table_name == other.table_name
2650            && self.projection == other.projection
2651            && self.projected_schema == other.projected_schema
2652            && self.filters == other.filters
2653            && self.fetch == other.fetch
2654    }
2655}
2656
2657impl Eq for TableScan {}
2658
2659// Manual implementation needed because of `source` and `projected_schema` fields.
2660// Comparison excludes these field.
2661impl PartialOrd for TableScan {
2662    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2663        #[derive(PartialEq, PartialOrd)]
2664        struct ComparableTableScan<'a> {
2665            /// The name of the table
2666            pub table_name: &'a TableReference,
2667            /// Optional column indices to use as a projection
2668            pub projection: &'a Option<Vec<usize>>,
2669            /// Optional expressions to be used as filters by the table provider
2670            pub filters: &'a Vec<Expr>,
2671            /// Optional number of rows to read
2672            pub fetch: &'a Option<usize>,
2673        }
2674        let comparable_self = ComparableTableScan {
2675            table_name: &self.table_name,
2676            projection: &self.projection,
2677            filters: &self.filters,
2678            fetch: &self.fetch,
2679        };
2680        let comparable_other = ComparableTableScan {
2681            table_name: &other.table_name,
2682            projection: &other.projection,
2683            filters: &other.filters,
2684            fetch: &other.fetch,
2685        };
2686        comparable_self
2687            .partial_cmp(&comparable_other)
2688            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2689            .filter(|cmp| *cmp != Ordering::Equal || self == other)
2690    }
2691}
2692
2693impl Hash for TableScan {
2694    fn hash<H: Hasher>(&self, state: &mut H) {
2695        self.table_name.hash(state);
2696        self.projection.hash(state);
2697        self.projected_schema.hash(state);
2698        self.filters.hash(state);
2699        self.fetch.hash(state);
2700    }
2701}
2702
2703impl TableScan {
2704    /// Initialize TableScan with appropriate schema from the given
2705    /// arguments.
2706    pub fn try_new(
2707        table_name: impl Into<TableReference>,
2708        table_source: Arc<dyn TableSource>,
2709        projection: Option<Vec<usize>>,
2710        filters: Vec<Expr>,
2711        fetch: Option<usize>,
2712    ) -> Result<Self> {
2713        let table_name = table_name.into();
2714
2715        if table_name.table().is_empty() {
2716            return plan_err!("table_name cannot be empty");
2717        }
2718        let schema = table_source.schema();
2719        let func_dependencies = FunctionalDependencies::new_from_constraints(
2720            table_source.constraints(),
2721            schema.fields.len(),
2722        );
2723        let projected_schema = projection
2724            .as_ref()
2725            .map(|p| {
2726                let projected_func_dependencies =
2727                    func_dependencies.project_functional_dependencies(p, p.len());
2728
2729                let df_schema = DFSchema::new_with_metadata(
2730                    p.iter()
2731                        .map(|i| {
2732                            (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2733                        })
2734                        .collect(),
2735                    schema.metadata.clone(),
2736                )?;
2737                df_schema.with_functional_dependencies(projected_func_dependencies)
2738            })
2739            .unwrap_or_else(|| {
2740                let df_schema =
2741                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2742                df_schema.with_functional_dependencies(func_dependencies)
2743            })?;
2744        let projected_schema = Arc::new(projected_schema);
2745
2746        Ok(Self {
2747            table_name,
2748            source: table_source,
2749            projection,
2750            projected_schema,
2751            filters,
2752            fetch,
2753        })
2754    }
2755}
2756
2757// Repartition the plan based on a partitioning scheme.
2758#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2759pub struct Repartition {
2760    /// The incoming logical plan
2761    pub input: Arc<LogicalPlan>,
2762    /// The partitioning scheme
2763    pub partitioning_scheme: Partitioning,
2764}
2765
2766/// Union multiple inputs
2767#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2768pub struct Union {
2769    /// Inputs to merge
2770    pub inputs: Vec<Arc<LogicalPlan>>,
2771    /// Union schema. Should be the same for all inputs.
2772    pub schema: DFSchemaRef,
2773}
2774
2775impl Union {
2776    /// Constructs new Union instance deriving schema from inputs.
2777    /// Schema data types must match exactly.
2778    pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2779        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2780        Ok(Union { inputs, schema })
2781    }
2782
2783    /// Constructs new Union instance deriving schema from inputs.
2784    /// Inputs do not have to have matching types and produced schema will
2785    /// take type from the first input.
2786    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2787    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2788        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2789        Ok(Union { inputs, schema })
2790    }
2791
2792    /// Constructs a new Union instance that combines rows from different tables by name,
2793    /// instead of by position. This means that the specified inputs need not have schemas
2794    /// that are all the same width.
2795    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2796        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2797        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2798
2799        Ok(Union { inputs, schema })
2800    }
2801
2802    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2803    /// in an additional `Projection` to account for absence of columns
2804    /// in input schemas or differing projection orders.
2805    fn rewrite_inputs_from_schema(
2806        schema: &Arc<DFSchema>,
2807        inputs: Vec<Arc<LogicalPlan>>,
2808    ) -> Result<Vec<Arc<LogicalPlan>>> {
2809        let schema_width = schema.iter().count();
2810        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2811        for input in inputs {
2812            // Any columns that exist within the derived schema but do not exist
2813            // within an input's schema should be replaced with `NULL` aliased
2814            // to the appropriate column in the derived schema.
2815            let mut expr = Vec::with_capacity(schema_width);
2816            for column in schema.columns() {
2817                if input
2818                    .schema()
2819                    .has_column_with_unqualified_name(column.name())
2820                {
2821                    expr.push(Expr::Column(column));
2822                } else {
2823                    expr.push(
2824                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2825                    );
2826                }
2827            }
2828            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2829                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2830            )));
2831        }
2832
2833        Ok(wrapped_inputs)
2834    }
2835
2836    /// Constructs new Union instance deriving schema from inputs.
2837    ///
2838    /// If `loose_types` is true, inputs do not need to have matching types and
2839    /// the produced schema will use the type from the first input.
2840    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2841    ///
2842    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2843    /// the constructed schema follows `UNION BY NAME` semantics.
2844    fn derive_schema_from_inputs(
2845        inputs: &[Arc<LogicalPlan>],
2846        loose_types: bool,
2847        by_name: bool,
2848    ) -> Result<DFSchemaRef> {
2849        if inputs.len() < 2 {
2850            return plan_err!("UNION requires at least two inputs");
2851        }
2852
2853        if by_name {
2854            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2855        } else {
2856            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2857        }
2858    }
2859
2860    fn derive_schema_from_inputs_by_name(
2861        inputs: &[Arc<LogicalPlan>],
2862        loose_types: bool,
2863    ) -> Result<DFSchemaRef> {
2864        type FieldData<'a> =
2865            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2866        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2867        for input in inputs.iter() {
2868            for field in input.schema().fields() {
2869                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2870                    cols.iter_mut().find(|(name, _)| name == field.name())
2871                {
2872                    if !loose_types && *data_type != field.data_type() {
2873                        return plan_err!(
2874                            "Found different types for field {}",
2875                            field.name()
2876                        );
2877                    }
2878
2879                    metadata.push(field.metadata());
2880                    // If the field is nullable in any one of the inputs,
2881                    // then the field in the final schema is also nullable.
2882                    *is_nullable |= field.is_nullable();
2883                    *occurrences += 1;
2884                } else {
2885                    cols.push((
2886                        field.name(),
2887                        (
2888                            field.data_type(),
2889                            field.is_nullable(),
2890                            vec![field.metadata()],
2891                            1,
2892                        ),
2893                    ));
2894                }
2895            }
2896        }
2897
2898        let union_fields = cols
2899            .into_iter()
2900            .map(
2901                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2902                    // If the final number of occurrences of the field is less
2903                    // than the number of inputs (i.e. the field is missing from
2904                    // one or more inputs), then it must be treated as nullable.
2905                    let final_is_nullable = if occurrences == inputs.len() {
2906                        is_nullable
2907                    } else {
2908                        true
2909                    };
2910
2911                    let mut field =
2912                        Field::new(name, data_type.clone(), final_is_nullable);
2913                    field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2914
2915                    (None, Arc::new(field))
2916                },
2917            )
2918            .collect::<Vec<(Option<TableReference>, _)>>();
2919
2920        let union_schema_metadata = intersect_metadata_for_union(
2921            inputs.iter().map(|input| input.schema().metadata()),
2922        );
2923
2924        // Functional Dependencies are not preserved after UNION operation
2925        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2926        let schema = Arc::new(schema);
2927
2928        Ok(schema)
2929    }
2930
2931    fn derive_schema_from_inputs_by_position(
2932        inputs: &[Arc<LogicalPlan>],
2933        loose_types: bool,
2934    ) -> Result<DFSchemaRef> {
2935        let first_schema = inputs[0].schema();
2936        let fields_count = first_schema.fields().len();
2937        for input in inputs.iter().skip(1) {
2938            if fields_count != input.schema().fields().len() {
2939                return plan_err!(
2940                    "UNION queries have different number of columns: \
2941                    left has {} columns whereas right has {} columns",
2942                    fields_count,
2943                    input.schema().fields().len()
2944                );
2945            }
2946        }
2947
2948        let mut name_counts: HashMap<String, usize> = HashMap::new();
2949        let union_fields = (0..fields_count)
2950            .map(|i| {
2951                let fields = inputs
2952                    .iter()
2953                    .map(|input| input.schema().field(i))
2954                    .collect::<Vec<_>>();
2955                let first_field = fields[0];
2956                let base_name = first_field.name().to_string();
2957
2958                let data_type = if loose_types {
2959                    // TODO apply type coercion here, or document why it's better to defer
2960                    // temporarily use the data type from the left input and later rely on the analyzer to
2961                    // coerce the two schemas into a common one.
2962                    first_field.data_type()
2963                } else {
2964                    fields.iter().skip(1).try_fold(
2965                        first_field.data_type(),
2966                        |acc, field| {
2967                            if acc != field.data_type() {
2968                                return plan_err!(
2969                                    "UNION field {i} have different type in inputs: \
2970                                    left has {} whereas right has {}",
2971                                    first_field.data_type(),
2972                                    field.data_type()
2973                                );
2974                            }
2975                            Ok(acc)
2976                        },
2977                    )?
2978                };
2979                let nullable = fields.iter().any(|field| field.is_nullable());
2980
2981                // Generate unique field name
2982                let name = if let Some(count) = name_counts.get_mut(&base_name) {
2983                    *count += 1;
2984                    format!("{base_name}_{count}")
2985                } else {
2986                    name_counts.insert(base_name.clone(), 0);
2987                    base_name
2988                };
2989
2990                let mut field = Field::new(&name, data_type.clone(), nullable);
2991                let field_metadata = intersect_metadata_for_union(
2992                    fields.iter().map(|field| field.metadata()),
2993                );
2994                field.set_metadata(field_metadata);
2995                Ok((None, Arc::new(field)))
2996            })
2997            .collect::<Result<_>>()?;
2998        let union_schema_metadata = intersect_metadata_for_union(
2999            inputs.iter().map(|input| input.schema().metadata()),
3000        );
3001
3002        // Functional Dependencies are not preserved after UNION operation
3003        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3004        let schema = Arc::new(schema);
3005
3006        Ok(schema)
3007    }
3008}
3009
3010// Manual implementation needed because of `schema` field. Comparison excludes this field.
3011impl PartialOrd for Union {
3012    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3013        self.inputs
3014            .partial_cmp(&other.inputs)
3015            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3016            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3017    }
3018}
3019
3020/// Describe the schema of table
3021///
3022/// # Example output:
3023///
3024/// ```sql
3025/// > describe traces;
3026/// +--------------------+-----------------------------+-------------+
3027/// | column_name        | data_type                   | is_nullable |
3028/// +--------------------+-----------------------------+-------------+
3029/// | attributes         | Utf8                        | YES         |
3030/// | duration_nano      | Int64                       | YES         |
3031/// | end_time_unix_nano | Int64                       | YES         |
3032/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
3033/// | span.kind          | Utf8                        | YES         |
3034/// | span.name          | Utf8                        | YES         |
3035/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
3036/// | time               | Timestamp(Nanosecond, None) | NO          |
3037/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
3038/// | otel.status_code   | Utf8                        | YES         |
3039/// | parent_span_id     | Utf8                        | YES         |
3040/// +--------------------+-----------------------------+-------------+
3041/// ```
3042#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3043pub struct DescribeTable {
3044    /// Table schema
3045    pub schema: Arc<Schema>,
3046    /// schema of describe table output
3047    pub output_schema: DFSchemaRef,
3048}
3049
3050// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
3051// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
3052impl PartialOrd for DescribeTable {
3053    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3054        // There is no relevant comparison for schemas
3055        None
3056    }
3057}
3058
3059/// Options for EXPLAIN
3060#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3061pub struct ExplainOption {
3062    /// Include detailed debug info
3063    pub verbose: bool,
3064    /// Actually execute the plan and report metrics
3065    pub analyze: bool,
3066    /// Output syntax/format
3067    pub format: ExplainFormat,
3068}
3069
3070impl Default for ExplainOption {
3071    fn default() -> Self {
3072        ExplainOption {
3073            verbose: false,
3074            analyze: false,
3075            format: ExplainFormat::Indent,
3076        }
3077    }
3078}
3079
3080impl ExplainOption {
3081    /// Builder‐style setter for `verbose`
3082    pub fn with_verbose(mut self, verbose: bool) -> Self {
3083        self.verbose = verbose;
3084        self
3085    }
3086
3087    /// Builder‐style setter for `analyze`
3088    pub fn with_analyze(mut self, analyze: bool) -> Self {
3089        self.analyze = analyze;
3090        self
3091    }
3092
3093    /// Builder‐style setter for `format`
3094    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3095        self.format = format;
3096        self
3097    }
3098}
3099
3100/// Produces a relation with string representations of
3101/// various parts of the plan
3102///
3103/// See [the documentation] for more information
3104///
3105/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3106#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3107pub struct Explain {
3108    /// Should extra (detailed, intermediate plans) be included?
3109    pub verbose: bool,
3110    /// Output format for explain, if specified.
3111    /// If none, defaults to `text`
3112    pub explain_format: ExplainFormat,
3113    /// The logical plan that is being EXPLAIN'd
3114    pub plan: Arc<LogicalPlan>,
3115    /// Represent the various stages plans have gone through
3116    pub stringified_plans: Vec<StringifiedPlan>,
3117    /// The output schema of the explain (2 columns of text)
3118    pub schema: DFSchemaRef,
3119    /// Used by physical planner to check if should proceed with planning
3120    pub logical_optimization_succeeded: bool,
3121}
3122
3123// Manual implementation needed because of `schema` field. Comparison excludes this field.
3124impl PartialOrd for Explain {
3125    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3126        #[derive(PartialEq, PartialOrd)]
3127        struct ComparableExplain<'a> {
3128            /// Should extra (detailed, intermediate plans) be included?
3129            pub verbose: &'a bool,
3130            /// The logical plan that is being EXPLAIN'd
3131            pub plan: &'a Arc<LogicalPlan>,
3132            /// Represent the various stages plans have gone through
3133            pub stringified_plans: &'a Vec<StringifiedPlan>,
3134            /// Used by physical planner to check if should proceed with planning
3135            pub logical_optimization_succeeded: &'a bool,
3136        }
3137        let comparable_self = ComparableExplain {
3138            verbose: &self.verbose,
3139            plan: &self.plan,
3140            stringified_plans: &self.stringified_plans,
3141            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3142        };
3143        let comparable_other = ComparableExplain {
3144            verbose: &other.verbose,
3145            plan: &other.plan,
3146            stringified_plans: &other.stringified_plans,
3147            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3148        };
3149        comparable_self
3150            .partial_cmp(&comparable_other)
3151            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3152            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3153    }
3154}
3155
3156/// Runs the actual plan, and then prints the physical plan with
3157/// with execution metrics.
3158#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3159pub struct Analyze {
3160    /// Should extra detail be included?
3161    pub verbose: bool,
3162    /// The logical plan that is being EXPLAIN ANALYZE'd
3163    pub input: Arc<LogicalPlan>,
3164    /// The output schema of the explain (2 columns of text)
3165    pub schema: DFSchemaRef,
3166}
3167
3168// Manual implementation needed because of `schema` field. Comparison excludes this field.
3169impl PartialOrd for Analyze {
3170    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3171        match self.verbose.partial_cmp(&other.verbose) {
3172            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3173            cmp => cmp,
3174        }
3175        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3176        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3177    }
3178}
3179
3180/// Extension operator defined outside of DataFusion
3181// TODO(clippy): This clippy `allow` should be removed if
3182// the manual `PartialEq` is removed in favor of a derive.
3183// (see `PartialEq` the impl for details.)
3184#[allow(clippy::derived_hash_with_manual_eq)]
3185#[derive(Debug, Clone, Eq, Hash)]
3186pub struct Extension {
3187    /// The runtime extension operator
3188    pub node: Arc<dyn UserDefinedLogicalNode>,
3189}
3190
3191// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3192// This manual implementation should be removed if
3193// https://github.com/rust-lang/rust/issues/39128 is fixed.
3194impl PartialEq for Extension {
3195    fn eq(&self, other: &Self) -> bool {
3196        self.node.eq(&other.node)
3197    }
3198}
3199
3200impl PartialOrd for Extension {
3201    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3202        self.node.partial_cmp(&other.node)
3203    }
3204}
3205
3206/// Produces the first `n` tuples from its input and discards the rest.
3207#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3208pub struct Limit {
3209    /// Number of rows to skip before fetch
3210    pub skip: Option<Box<Expr>>,
3211    /// Maximum number of rows to fetch,
3212    /// None means fetching all rows
3213    pub fetch: Option<Box<Expr>>,
3214    /// The logical plan
3215    pub input: Arc<LogicalPlan>,
3216}
3217
3218/// Different types of skip expression in Limit plan.
3219pub enum SkipType {
3220    /// The skip expression is a literal value.
3221    Literal(usize),
3222    /// Currently only supports expressions that can be folded into constants.
3223    UnsupportedExpr,
3224}
3225
3226/// Different types of fetch expression in Limit plan.
3227pub enum FetchType {
3228    /// The fetch expression is a literal value.
3229    /// `Literal(None)` means the fetch expression is not provided.
3230    Literal(Option<usize>),
3231    /// Currently only supports expressions that can be folded into constants.
3232    UnsupportedExpr,
3233}
3234
3235impl Limit {
3236    /// Get the skip type from the limit plan.
3237    pub fn get_skip_type(&self) -> Result<SkipType> {
3238        match self.skip.as_deref() {
3239            Some(expr) => match *expr {
3240                Expr::Literal(ScalarValue::Int64(s), _) => {
3241                    // `skip = NULL` is equivalent to `skip = 0`
3242                    let s = s.unwrap_or(0);
3243                    if s >= 0 {
3244                        Ok(SkipType::Literal(s as usize))
3245                    } else {
3246                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3247                    }
3248                }
3249                _ => Ok(SkipType::UnsupportedExpr),
3250            },
3251            // `skip = None` is equivalent to `skip = 0`
3252            None => Ok(SkipType::Literal(0)),
3253        }
3254    }
3255
3256    /// Get the fetch type from the limit plan.
3257    pub fn get_fetch_type(&self) -> Result<FetchType> {
3258        match self.fetch.as_deref() {
3259            Some(expr) => match *expr {
3260                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3261                    if s >= 0 {
3262                        Ok(FetchType::Literal(Some(s as usize)))
3263                    } else {
3264                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3265                    }
3266                }
3267                Expr::Literal(ScalarValue::Int64(None), _) => {
3268                    Ok(FetchType::Literal(None))
3269                }
3270                _ => Ok(FetchType::UnsupportedExpr),
3271            },
3272            None => Ok(FetchType::Literal(None)),
3273        }
3274    }
3275}
3276
3277/// Removes duplicate rows from the input
3278#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3279pub enum Distinct {
3280    /// Plain `DISTINCT` referencing all selection expressions
3281    All(Arc<LogicalPlan>),
3282    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3283    On(DistinctOn),
3284}
3285
3286impl Distinct {
3287    /// return a reference to the nodes input
3288    pub fn input(&self) -> &Arc<LogicalPlan> {
3289        match self {
3290            Distinct::All(input) => input,
3291            Distinct::On(DistinctOn { input, .. }) => input,
3292        }
3293    }
3294}
3295
3296/// Removes duplicate rows from the input
3297#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3298pub struct DistinctOn {
3299    /// The `DISTINCT ON` clause expression list
3300    pub on_expr: Vec<Expr>,
3301    /// The selected projection expression list
3302    pub select_expr: Vec<Expr>,
3303    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3304    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3305    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3306    pub sort_expr: Option<Vec<SortExpr>>,
3307    /// The logical plan that is being DISTINCT'd
3308    pub input: Arc<LogicalPlan>,
3309    /// The schema description of the DISTINCT ON output
3310    pub schema: DFSchemaRef,
3311}
3312
3313impl DistinctOn {
3314    /// Create a new `DistinctOn` struct.
3315    pub fn try_new(
3316        on_expr: Vec<Expr>,
3317        select_expr: Vec<Expr>,
3318        sort_expr: Option<Vec<SortExpr>>,
3319        input: Arc<LogicalPlan>,
3320    ) -> Result<Self> {
3321        if on_expr.is_empty() {
3322            return plan_err!("No `ON` expressions provided");
3323        }
3324
3325        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3326        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3327            .into_iter()
3328            .collect();
3329
3330        let dfschema = DFSchema::new_with_metadata(
3331            qualified_fields,
3332            input.schema().metadata().clone(),
3333        )?;
3334
3335        let mut distinct_on = DistinctOn {
3336            on_expr,
3337            select_expr,
3338            sort_expr: None,
3339            input,
3340            schema: Arc::new(dfschema),
3341        };
3342
3343        if let Some(sort_expr) = sort_expr {
3344            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3345        }
3346
3347        Ok(distinct_on)
3348    }
3349
3350    /// Try to update `self` with a new sort expressions.
3351    ///
3352    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3353    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3354        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3355
3356        // Check that the left-most sort expressions are the same as the `ON` expressions.
3357        let mut matched = true;
3358        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3359            if on != &sort.expr {
3360                matched = false;
3361                break;
3362            }
3363        }
3364
3365        if self.on_expr.len() > sort_expr.len() || !matched {
3366            return plan_err!(
3367                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3368            );
3369        }
3370
3371        self.sort_expr = Some(sort_expr);
3372        Ok(self)
3373    }
3374}
3375
3376// Manual implementation needed because of `schema` field. Comparison excludes this field.
3377impl PartialOrd for DistinctOn {
3378    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3379        #[derive(PartialEq, PartialOrd)]
3380        struct ComparableDistinctOn<'a> {
3381            /// The `DISTINCT ON` clause expression list
3382            pub on_expr: &'a Vec<Expr>,
3383            /// The selected projection expression list
3384            pub select_expr: &'a Vec<Expr>,
3385            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3386            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3387            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3388            pub sort_expr: &'a Option<Vec<SortExpr>>,
3389            /// The logical plan that is being DISTINCT'd
3390            pub input: &'a Arc<LogicalPlan>,
3391        }
3392        let comparable_self = ComparableDistinctOn {
3393            on_expr: &self.on_expr,
3394            select_expr: &self.select_expr,
3395            sort_expr: &self.sort_expr,
3396            input: &self.input,
3397        };
3398        let comparable_other = ComparableDistinctOn {
3399            on_expr: &other.on_expr,
3400            select_expr: &other.select_expr,
3401            sort_expr: &other.sort_expr,
3402            input: &other.input,
3403        };
3404        comparable_self
3405            .partial_cmp(&comparable_other)
3406            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3407            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3408    }
3409}
3410
3411/// Aggregates its input based on a set of grouping and aggregate
3412/// expressions (e.g. SUM).
3413///
3414/// # Output Schema
3415///
3416/// The output schema is the group expressions followed by the aggregate
3417/// expressions in order.
3418///
3419/// For example, given the input schema `"A", "B", "C"` and the aggregate
3420/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3421/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3422/// single new column
3423#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3424// mark non_exhaustive to encourage use of try_new/new()
3425#[non_exhaustive]
3426pub struct Aggregate {
3427    /// The incoming logical plan
3428    pub input: Arc<LogicalPlan>,
3429    /// Grouping expressions
3430    pub group_expr: Vec<Expr>,
3431    /// Aggregate expressions
3432    pub aggr_expr: Vec<Expr>,
3433    /// The schema description of the aggregate output
3434    pub schema: DFSchemaRef,
3435}
3436
3437impl Aggregate {
3438    /// Create a new aggregate operator.
3439    pub fn try_new(
3440        input: Arc<LogicalPlan>,
3441        group_expr: Vec<Expr>,
3442        aggr_expr: Vec<Expr>,
3443    ) -> Result<Self> {
3444        let group_expr = enumerate_grouping_sets(group_expr)?;
3445
3446        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3447
3448        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3449
3450        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3451
3452        // Even columns that cannot be null will become nullable when used in a grouping set.
3453        if is_grouping_set {
3454            qualified_fields = qualified_fields
3455                .into_iter()
3456                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3457                .collect::<Vec<_>>();
3458            qualified_fields.push((
3459                None,
3460                Field::new(
3461                    Self::INTERNAL_GROUPING_ID,
3462                    Self::grouping_id_type(qualified_fields.len()),
3463                    false,
3464                )
3465                .into(),
3466            ));
3467        }
3468
3469        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3470
3471        let schema = DFSchema::new_with_metadata(
3472            qualified_fields,
3473            input.schema().metadata().clone(),
3474        )?;
3475
3476        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3477    }
3478
3479    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3480    /// building the schema again when the schema is already known.
3481    ///
3482    /// This method should only be called when you are absolutely sure that the schema being
3483    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3484    pub fn try_new_with_schema(
3485        input: Arc<LogicalPlan>,
3486        group_expr: Vec<Expr>,
3487        aggr_expr: Vec<Expr>,
3488        schema: DFSchemaRef,
3489    ) -> Result<Self> {
3490        if group_expr.is_empty() && aggr_expr.is_empty() {
3491            return plan_err!(
3492                "Aggregate requires at least one grouping or aggregate expression. \
3493                Aggregate without grouping expressions nor aggregate expressions is \
3494                logically equivalent to, but less efficient than, VALUES producing \
3495                single row. Please use VALUES instead."
3496            );
3497        }
3498        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3499        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3500            return plan_err!(
3501                "Aggregate schema has wrong number of fields. Expected {} got {}",
3502                group_expr_count + aggr_expr.len(),
3503                schema.fields().len()
3504            );
3505        }
3506
3507        let aggregate_func_dependencies =
3508            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3509        let new_schema = schema.as_ref().clone();
3510        let schema = Arc::new(
3511            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3512        );
3513        Ok(Self {
3514            input,
3515            group_expr,
3516            aggr_expr,
3517            schema,
3518        })
3519    }
3520
3521    fn is_grouping_set(&self) -> bool {
3522        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3523    }
3524
3525    /// Get the output expressions.
3526    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3527        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3528            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3529        });
3530        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3531        if self.is_grouping_set() {
3532            exprs.push(&INTERNAL_ID_EXPR);
3533        }
3534        exprs.extend(self.aggr_expr.iter());
3535        debug_assert!(exprs.len() == self.schema.fields().len());
3536        Ok(exprs)
3537    }
3538
3539    /// Get the length of the group by expression in the output schema
3540    /// This is not simply group by expression length. Expression may be
3541    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3542    pub fn group_expr_len(&self) -> Result<usize> {
3543        grouping_set_expr_count(&self.group_expr)
3544    }
3545
3546    /// Returns the data type of the grouping id.
3547    /// The grouping ID value is a bitmask where each set bit
3548    /// indicates that the corresponding grouping expression is
3549    /// null
3550    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3551        if group_exprs <= 8 {
3552            DataType::UInt8
3553        } else if group_exprs <= 16 {
3554            DataType::UInt16
3555        } else if group_exprs <= 32 {
3556            DataType::UInt32
3557        } else {
3558            DataType::UInt64
3559        }
3560    }
3561
3562    /// Internal column used when the aggregation is a grouping set.
3563    ///
3564    /// This column contains a bitmask where each bit represents a grouping
3565    /// expression. The least significant bit corresponds to the rightmost
3566    /// grouping expression. A bit value of 0 indicates that the corresponding
3567    /// column is included in the grouping set, while a value of 1 means it is excluded.
3568    ///
3569    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3570    /// column will have the following values:
3571    ///     0b00: Both `a` and `b` are included
3572    ///     0b01: `b` is excluded
3573    ///     0b10: `a` is excluded
3574    ///     0b11: Both `a` and `b` are excluded
3575    ///
3576    /// This internal column is necessary because excluded columns are replaced
3577    /// with `NULL` values. To handle these cases correctly, we must distinguish
3578    /// between an actual `NULL` value in a column and a column being excluded from the set.
3579    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3580}
3581
3582// Manual implementation needed because of `schema` field. Comparison excludes this field.
3583impl PartialOrd for Aggregate {
3584    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3585        match self.input.partial_cmp(&other.input) {
3586            Some(Ordering::Equal) => {
3587                match self.group_expr.partial_cmp(&other.group_expr) {
3588                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3589                    cmp => cmp,
3590                }
3591            }
3592            cmp => cmp,
3593        }
3594        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3595        .filter(|cmp| *cmp != Ordering::Equal || self == other)
3596    }
3597}
3598
3599/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3600fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3601    group_expr
3602        .iter()
3603        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3604}
3605
3606/// Calculates functional dependencies for aggregate expressions.
3607fn calc_func_dependencies_for_aggregate(
3608    // Expressions in the GROUP BY clause:
3609    group_expr: &[Expr],
3610    // Input plan of the aggregate:
3611    input: &LogicalPlan,
3612    // Aggregate schema
3613    aggr_schema: &DFSchema,
3614) -> Result<FunctionalDependencies> {
3615    // We can do a case analysis on how to propagate functional dependencies based on
3616    // whether the GROUP BY in question contains a grouping set expression:
3617    // - If so, the functional dependencies will be empty because we cannot guarantee
3618    //   that GROUP BY expression results will be unique.
3619    // - Otherwise, it may be possible to propagate functional dependencies.
3620    if !contains_grouping_set(group_expr) {
3621        let group_by_expr_names = group_expr
3622            .iter()
3623            .map(|item| item.schema_name().to_string())
3624            .collect::<IndexSet<_>>()
3625            .into_iter()
3626            .collect::<Vec<_>>();
3627        let aggregate_func_dependencies = aggregate_functional_dependencies(
3628            input.schema(),
3629            &group_by_expr_names,
3630            aggr_schema,
3631        );
3632        Ok(aggregate_func_dependencies)
3633    } else {
3634        Ok(FunctionalDependencies::empty())
3635    }
3636}
3637
3638/// This function projects functional dependencies of the `input` plan according
3639/// to projection expressions `exprs`.
3640fn calc_func_dependencies_for_project(
3641    exprs: &[Expr],
3642    input: &LogicalPlan,
3643) -> Result<FunctionalDependencies> {
3644    let input_fields = input.schema().field_names();
3645    // Calculate expression indices (if present) in the input schema.
3646    let proj_indices = exprs
3647        .iter()
3648        .map(|expr| match expr {
3649            #[expect(deprecated)]
3650            Expr::Wildcard { qualifier, options } => {
3651                let wildcard_fields = exprlist_to_fields(
3652                    vec![&Expr::Wildcard {
3653                        qualifier: qualifier.clone(),
3654                        options: options.clone(),
3655                    }],
3656                    input,
3657                )?;
3658                Ok::<_, DataFusionError>(
3659                    wildcard_fields
3660                        .into_iter()
3661                        .filter_map(|(qualifier, f)| {
3662                            let flat_name = qualifier
3663                                .map(|t| format!("{}.{}", t, f.name()))
3664                                .unwrap_or_else(|| f.name().clone());
3665                            input_fields.iter().position(|item| *item == flat_name)
3666                        })
3667                        .collect::<Vec<_>>(),
3668                )
3669            }
3670            Expr::Alias(alias) => {
3671                let name = format!("{}", alias.expr);
3672                Ok(input_fields
3673                    .iter()
3674                    .position(|item| *item == name)
3675                    .map(|i| vec![i])
3676                    .unwrap_or(vec![]))
3677            }
3678            _ => {
3679                let name = format!("{expr}");
3680                Ok(input_fields
3681                    .iter()
3682                    .position(|item| *item == name)
3683                    .map(|i| vec![i])
3684                    .unwrap_or(vec![]))
3685            }
3686        })
3687        .collect::<Result<Vec<_>>>()?
3688        .into_iter()
3689        .flatten()
3690        .collect::<Vec<_>>();
3691
3692    Ok(input
3693        .schema()
3694        .functional_dependencies()
3695        .project_functional_dependencies(&proj_indices, exprs.len()))
3696}
3697
3698/// Sorts its input according to a list of sort expressions.
3699#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3700pub struct Sort {
3701    /// The sort expressions
3702    pub expr: Vec<SortExpr>,
3703    /// The incoming logical plan
3704    pub input: Arc<LogicalPlan>,
3705    /// Optional fetch limit
3706    pub fetch: Option<usize>,
3707}
3708
3709/// Join two logical plans on one or more join columns
3710#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3711pub struct Join {
3712    /// Left input
3713    pub left: Arc<LogicalPlan>,
3714    /// Right input
3715    pub right: Arc<LogicalPlan>,
3716    /// Equijoin clause expressed as pairs of (left, right) join expressions
3717    pub on: Vec<(Expr, Expr)>,
3718    /// Filters applied during join (non-equi conditions)
3719    pub filter: Option<Expr>,
3720    /// Join type
3721    pub join_type: JoinType,
3722    /// Join constraint
3723    pub join_constraint: JoinConstraint,
3724    /// The output schema, containing fields from the left and right inputs
3725    pub schema: DFSchemaRef,
3726    /// Defines the null equality for the join.
3727    pub null_equality: NullEquality,
3728}
3729
3730impl Join {
3731    /// Creates a new Join operator with automatically computed schema.
3732    ///
3733    /// This constructor computes the schema based on the join type and inputs,
3734    /// removing the need to manually specify the schema or call `recompute_schema`.
3735    ///
3736    /// # Arguments
3737    ///
3738    /// * `left` - Left input plan
3739    /// * `right` - Right input plan
3740    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3741    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3742    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3743    /// * `join_constraint` - Join constraint (On, Using)
3744    /// * `null_equality` - How to handle nulls in join comparisons
3745    ///
3746    /// # Returns
3747    ///
3748    /// A new Join operator with the computed schema
3749    pub fn try_new(
3750        left: Arc<LogicalPlan>,
3751        right: Arc<LogicalPlan>,
3752        on: Vec<(Expr, Expr)>,
3753        filter: Option<Expr>,
3754        join_type: JoinType,
3755        join_constraint: JoinConstraint,
3756        null_equality: NullEquality,
3757    ) -> Result<Self> {
3758        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3759
3760        Ok(Join {
3761            left,
3762            right,
3763            on,
3764            filter,
3765            join_type,
3766            join_constraint,
3767            schema: Arc::new(join_schema),
3768            null_equality,
3769        })
3770    }
3771
3772    /// Create Join with input which wrapped with projection, this method is used in physical planning only to help
3773    /// create the physical join.
3774    pub fn try_new_with_project_input(
3775        original: &LogicalPlan,
3776        left: Arc<LogicalPlan>,
3777        right: Arc<LogicalPlan>,
3778        column_on: (Vec<Column>, Vec<Column>),
3779    ) -> Result<(Self, bool)> {
3780        let original_join = match original {
3781            LogicalPlan::Join(join) => join,
3782            _ => return plan_err!("Could not create join with project input"),
3783        };
3784
3785        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3786        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3787
3788        let mut requalified = false;
3789
3790        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3791        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3792        if original_join.join_type == JoinType::Inner
3793            || original_join.join_type == JoinType::Left
3794            || original_join.join_type == JoinType::Right
3795            || original_join.join_type == JoinType::Full
3796        {
3797            (left_sch, right_sch, requalified) =
3798                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3799        }
3800
3801        let on: Vec<(Expr, Expr)> = column_on
3802            .0
3803            .into_iter()
3804            .zip(column_on.1)
3805            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3806            .collect();
3807
3808        let join_schema = build_join_schema(
3809            left_sch.schema(),
3810            right_sch.schema(),
3811            &original_join.join_type,
3812        )?;
3813
3814        Ok((
3815            Join {
3816                left,
3817                right,
3818                on,
3819                filter: original_join.filter.clone(),
3820                join_type: original_join.join_type,
3821                join_constraint: original_join.join_constraint,
3822                schema: Arc::new(join_schema),
3823                null_equality: original_join.null_equality,
3824            },
3825            requalified,
3826        ))
3827    }
3828}
3829
3830// Manual implementation needed because of `schema` field. Comparison excludes this field.
3831impl PartialOrd for Join {
3832    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3833        #[derive(PartialEq, PartialOrd)]
3834        struct ComparableJoin<'a> {
3835            /// Left input
3836            pub left: &'a Arc<LogicalPlan>,
3837            /// Right input
3838            pub right: &'a Arc<LogicalPlan>,
3839            /// Equijoin clause expressed as pairs of (left, right) join expressions
3840            pub on: &'a Vec<(Expr, Expr)>,
3841            /// Filters applied during join (non-equi conditions)
3842            pub filter: &'a Option<Expr>,
3843            /// Join type
3844            pub join_type: &'a JoinType,
3845            /// Join constraint
3846            pub join_constraint: &'a JoinConstraint,
3847            /// The null handling behavior for equalities
3848            pub null_equality: &'a NullEquality,
3849        }
3850        let comparable_self = ComparableJoin {
3851            left: &self.left,
3852            right: &self.right,
3853            on: &self.on,
3854            filter: &self.filter,
3855            join_type: &self.join_type,
3856            join_constraint: &self.join_constraint,
3857            null_equality: &self.null_equality,
3858        };
3859        let comparable_other = ComparableJoin {
3860            left: &other.left,
3861            right: &other.right,
3862            on: &other.on,
3863            filter: &other.filter,
3864            join_type: &other.join_type,
3865            join_constraint: &other.join_constraint,
3866            null_equality: &other.null_equality,
3867        };
3868        comparable_self
3869            .partial_cmp(&comparable_other)
3870            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
3871            .filter(|cmp| *cmp != Ordering::Equal || self == other)
3872    }
3873}
3874
3875/// Subquery
3876#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3877pub struct Subquery {
3878    /// The subquery
3879    pub subquery: Arc<LogicalPlan>,
3880    /// The outer references used in the subquery
3881    pub outer_ref_columns: Vec<Expr>,
3882    /// Span information for subquery projection columns
3883    pub spans: Spans,
3884}
3885
3886impl Normalizeable for Subquery {
3887    fn can_normalize(&self) -> bool {
3888        false
3889    }
3890}
3891
3892impl NormalizeEq for Subquery {
3893    fn normalize_eq(&self, other: &Self) -> bool {
3894        // TODO: may be implement NormalizeEq for LogicalPlan?
3895        *self.subquery == *other.subquery
3896            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3897            && self
3898                .outer_ref_columns
3899                .iter()
3900                .zip(other.outer_ref_columns.iter())
3901                .all(|(a, b)| a.normalize_eq(b))
3902    }
3903}
3904
3905impl Subquery {
3906    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3907        match plan {
3908            Expr::ScalarSubquery(it) => Ok(it),
3909            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3910            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3911        }
3912    }
3913
3914    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3915        Subquery {
3916            subquery: plan,
3917            outer_ref_columns: self.outer_ref_columns.clone(),
3918            spans: Spans::new(),
3919        }
3920    }
3921}
3922
3923impl Debug for Subquery {
3924    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3925        write!(f, "<subquery>")
3926    }
3927}
3928
3929/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
3930///
3931/// See [`Partitioning`] for more details on partitioning
3932///
3933/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
3934#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3935pub enum Partitioning {
3936    /// Allocate batches using a round-robin algorithm and the specified number of partitions
3937    RoundRobinBatch(usize),
3938    /// Allocate rows based on a hash of one of more expressions and the specified number
3939    /// of partitions.
3940    Hash(Vec<Expr>, usize),
3941    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
3942    DistributeBy(Vec<Expr>),
3943}
3944
3945/// Represent the unnesting operation on a list column, such as the recursion depth and
3946/// the output column name after unnesting
3947///
3948/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
3949///
3950/// ```text
3951///   input             output_name
3952///  ┌─────────┐      ┌─────────┐
3953///  │{{1,2}}  │      │ 1       │
3954///  ├─────────┼─────►├─────────┤
3955///  │{{3}}    │      │ 2       │
3956///  ├─────────┤      ├─────────┤
3957///  │{{4},{5}}│      │ 3       │
3958///  └─────────┘      ├─────────┤
3959///                   │ 4       │
3960///                   ├─────────┤
3961///                   │ 5       │
3962///                   └─────────┘
3963/// ```
3964#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3965pub struct ColumnUnnestList {
3966    pub output_column: Column,
3967    pub depth: usize,
3968}
3969
3970impl Display for ColumnUnnestList {
3971    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3972        write!(f, "{}|depth={}", self.output_column, self.depth)
3973    }
3974}
3975
3976/// Unnest a column that contains a nested list type. See
3977/// [`UnnestOptions`] for more details.
3978#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3979pub struct Unnest {
3980    /// The incoming logical plan
3981    pub input: Arc<LogicalPlan>,
3982    /// Columns to run unnest on, can be a list of (List/Struct) columns
3983    pub exec_columns: Vec<Column>,
3984    /// refer to the indices(in the input schema) of columns
3985    /// that have type list to run unnest on
3986    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3987    /// refer to the indices (in the input schema) of columns
3988    /// that have type struct to run unnest on
3989    pub struct_type_columns: Vec<usize>,
3990    /// Having items aligned with the output columns
3991    /// representing which column in the input schema each output column depends on
3992    pub dependency_indices: Vec<usize>,
3993    /// The output schema, containing the unnested field column.
3994    pub schema: DFSchemaRef,
3995    /// Options
3996    pub options: UnnestOptions,
3997}
3998
3999// Manual implementation needed because of `schema` field. Comparison excludes this field.
4000impl PartialOrd for Unnest {
4001    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4002        #[derive(PartialEq, PartialOrd)]
4003        struct ComparableUnnest<'a> {
4004            /// The incoming logical plan
4005            pub input: &'a Arc<LogicalPlan>,
4006            /// Columns to run unnest on, can be a list of (List/Struct) columns
4007            pub exec_columns: &'a Vec<Column>,
4008            /// refer to the indices(in the input schema) of columns
4009            /// that have type list to run unnest on
4010            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4011            /// refer to the indices (in the input schema) of columns
4012            /// that have type struct to run unnest on
4013            pub struct_type_columns: &'a Vec<usize>,
4014            /// Having items aligned with the output columns
4015            /// representing which column in the input schema each output column depends on
4016            pub dependency_indices: &'a Vec<usize>,
4017            /// Options
4018            pub options: &'a UnnestOptions,
4019        }
4020        let comparable_self = ComparableUnnest {
4021            input: &self.input,
4022            exec_columns: &self.exec_columns,
4023            list_type_columns: &self.list_type_columns,
4024            struct_type_columns: &self.struct_type_columns,
4025            dependency_indices: &self.dependency_indices,
4026            options: &self.options,
4027        };
4028        let comparable_other = ComparableUnnest {
4029            input: &other.input,
4030            exec_columns: &other.exec_columns,
4031            list_type_columns: &other.list_type_columns,
4032            struct_type_columns: &other.struct_type_columns,
4033            dependency_indices: &other.dependency_indices,
4034            options: &other.options,
4035        };
4036        comparable_self
4037            .partial_cmp(&comparable_other)
4038            // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
4039            .filter(|cmp| *cmp != Ordering::Equal || self == other)
4040    }
4041}
4042
4043impl Unnest {
4044    pub fn try_new(
4045        input: Arc<LogicalPlan>,
4046        exec_columns: Vec<Column>,
4047        options: UnnestOptions,
4048    ) -> Result<Self> {
4049        if exec_columns.is_empty() {
4050            return plan_err!("unnest plan requires at least 1 column to unnest");
4051        }
4052
4053        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4054        let mut struct_columns = vec![];
4055        let indices_to_unnest = exec_columns
4056            .iter()
4057            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4058            .collect::<Result<HashMap<usize, &Column>>>()?;
4059
4060        let input_schema = input.schema();
4061
4062        let mut dependency_indices = vec![];
4063        // Transform input schema into new schema
4064        // Given this comprehensive example
4065        //
4066        // input schema:
4067        // 1.col1_unnest_placeholder: list[list[int]],
4068        // 2.col1: list[list[int]]
4069        // 3.col2: list[int]
4070        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4071        // output schema:
4072        // 1.unnest_col1_depth_2: int
4073        // 2.unnest_col1_depth_1: list[int]
4074        // 3.col1: list[list[int]]
4075        // 4.unnest_col2_depth_1: int
4076        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4077        // the plural.
4078        let fields = input_schema
4079            .iter()
4080            .enumerate()
4081            .map(|(index, (original_qualifier, original_field))| {
4082                match indices_to_unnest.get(&index) {
4083                    Some(column_to_unnest) => {
4084                        let recursions_on_column = options
4085                            .recursions
4086                            .iter()
4087                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4088                            .collect::<Vec<_>>();
4089                        let mut transformed_columns = recursions_on_column
4090                            .iter()
4091                            .map(|r| {
4092                                list_columns.push((
4093                                    index,
4094                                    ColumnUnnestList {
4095                                        output_column: r.output_column.clone(),
4096                                        depth: r.depth,
4097                                    },
4098                                ));
4099                                Ok(get_unnested_columns(
4100                                    &r.output_column.name,
4101                                    original_field.data_type(),
4102                                    r.depth,
4103                                )?
4104                                .into_iter()
4105                                .next()
4106                                .unwrap()) // because unnesting a list column always result into one result
4107                            })
4108                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4109                        if transformed_columns.is_empty() {
4110                            transformed_columns = get_unnested_columns(
4111                                &column_to_unnest.name,
4112                                original_field.data_type(),
4113                                1,
4114                            )?;
4115                            match original_field.data_type() {
4116                                DataType::Struct(_) => {
4117                                    struct_columns.push(index);
4118                                }
4119                                DataType::List(_)
4120                                | DataType::FixedSizeList(_, _)
4121                                | DataType::LargeList(_) => {
4122                                    list_columns.push((
4123                                        index,
4124                                        ColumnUnnestList {
4125                                            output_column: Column::from_name(
4126                                                &column_to_unnest.name,
4127                                            ),
4128                                            depth: 1,
4129                                        },
4130                                    ));
4131                                }
4132                                _ => {}
4133                            };
4134                        }
4135
4136                        // new columns dependent on the same original index
4137                        dependency_indices.extend(std::iter::repeat_n(
4138                            index,
4139                            transformed_columns.len(),
4140                        ));
4141                        Ok(transformed_columns
4142                            .iter()
4143                            .map(|(col, field)| {
4144                                (col.relation.to_owned(), field.to_owned())
4145                            })
4146                            .collect())
4147                    }
4148                    None => {
4149                        dependency_indices.push(index);
4150                        Ok(vec![(
4151                            original_qualifier.cloned(),
4152                            Arc::clone(original_field),
4153                        )])
4154                    }
4155                }
4156            })
4157            .collect::<Result<Vec<_>>>()?
4158            .into_iter()
4159            .flatten()
4160            .collect::<Vec<_>>();
4161
4162        let metadata = input_schema.metadata().clone();
4163        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4164        // We can use the existing functional dependencies:
4165        let deps = input_schema.functional_dependencies().clone();
4166        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4167
4168        Ok(Unnest {
4169            input,
4170            exec_columns,
4171            list_type_columns: list_columns,
4172            struct_type_columns: struct_columns,
4173            dependency_indices,
4174            schema,
4175            options,
4176        })
4177    }
4178}
4179
4180// Based on data type, either struct or a variant of list
4181// return a set of columns as the result of unnesting
4182// the input columns.
4183// For example, given a column with name "a",
4184// - List(Element) returns ["a"] with data type Element
4185// - Struct(field1, field2) returns ["a.field1","a.field2"]
4186// For list data type, an argument depth is used to specify
4187// the recursion level
4188fn get_unnested_columns(
4189    col_name: &String,
4190    data_type: &DataType,
4191    depth: usize,
4192) -> Result<Vec<(Column, Arc<Field>)>> {
4193    let mut qualified_columns = Vec::with_capacity(1);
4194
4195    match data_type {
4196        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4197            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4198            let new_field = Arc::new(Field::new(
4199                col_name, data_type,
4200                // Unnesting may produce NULLs even if the list is not null.
4201                // For example: unnest([1], []) -> 1, null
4202                true,
4203            ));
4204            let column = Column::from_name(col_name);
4205            // let column = Column::from((None, &new_field));
4206            qualified_columns.push((column, new_field));
4207        }
4208        DataType::Struct(fields) => {
4209            qualified_columns.extend(fields.iter().map(|f| {
4210                let new_name = format!("{}.{}", col_name, f.name());
4211                let column = Column::from_name(&new_name);
4212                let new_field = f.as_ref().clone().with_name(new_name);
4213                // let column = Column::from((None, &f));
4214                (column, Arc::new(new_field))
4215            }))
4216        }
4217        _ => {
4218            return internal_err!("trying to unnest on invalid data type {data_type}");
4219        }
4220    };
4221    Ok(qualified_columns)
4222}
4223
4224// Get the data type of a multi-dimensional type after unnesting it
4225// with a given depth
4226fn get_unnested_list_datatype_recursive(
4227    data_type: &DataType,
4228    depth: usize,
4229) -> Result<DataType> {
4230    match data_type {
4231        DataType::List(field)
4232        | DataType::FixedSizeList(field, _)
4233        | DataType::LargeList(field) => {
4234            if depth == 1 {
4235                return Ok(field.data_type().clone());
4236            }
4237            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4238        }
4239        _ => {}
4240    };
4241
4242    internal_err!("trying to unnest on invalid data type {data_type}")
4243}
4244
4245#[cfg(test)]
4246mod tests {
4247    use super::*;
4248    use crate::builder::LogicalTableSource;
4249    use crate::logical_plan::table_scan;
4250    use crate::test::function_stub::{count, count_udaf};
4251    use crate::{
4252        binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4253        GroupingSet,
4254    };
4255    use datafusion_common::metadata::ScalarAndMetadata;
4256    use datafusion_common::tree_node::{
4257        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4258    };
4259    use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4260    use insta::{assert_debug_snapshot, assert_snapshot};
4261    use std::hash::DefaultHasher;
4262
4263    fn employee_schema() -> Schema {
4264        Schema::new(vec![
4265            Field::new("id", DataType::Int32, false),
4266            Field::new("first_name", DataType::Utf8, false),
4267            Field::new("last_name", DataType::Utf8, false),
4268            Field::new("state", DataType::Utf8, false),
4269            Field::new("salary", DataType::Int32, false),
4270        ])
4271    }
4272
4273    fn display_plan() -> Result<LogicalPlan> {
4274        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4275            .build()?;
4276
4277        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4278            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4279            .project(vec![col("id")])?
4280            .build()
4281    }
4282
4283    #[test]
4284    fn test_display_indent() -> Result<()> {
4285        let plan = display_plan()?;
4286
4287        assert_snapshot!(plan.display_indent(), @r"
4288        Projection: employee_csv.id
4289          Filter: employee_csv.state IN (<subquery>)
4290            Subquery:
4291              TableScan: employee_csv projection=[state]
4292            TableScan: employee_csv projection=[id, state]
4293        ");
4294        Ok(())
4295    }
4296
4297    #[test]
4298    fn test_display_indent_schema() -> Result<()> {
4299        let plan = display_plan()?;
4300
4301        assert_snapshot!(plan.display_indent_schema(), @r"
4302        Projection: employee_csv.id [id:Int32]
4303          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4304            Subquery: [state:Utf8]
4305              TableScan: employee_csv projection=[state] [state:Utf8]
4306            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4307        ");
4308        Ok(())
4309    }
4310
4311    #[test]
4312    fn test_display_subquery_alias() -> Result<()> {
4313        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4314            .build()?;
4315        let plan1 = Arc::new(plan1);
4316
4317        let plan =
4318            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4319                .project(vec![col("id"), exists(plan1).alias("exists")])?
4320                .build();
4321
4322        assert_snapshot!(plan?.display_indent(), @r"
4323        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4324          Subquery:
4325            TableScan: employee_csv projection=[state]
4326          TableScan: employee_csv projection=[id, state]
4327        ");
4328        Ok(())
4329    }
4330
4331    #[test]
4332    fn test_display_graphviz() -> Result<()> {
4333        let plan = display_plan()?;
4334
4335        // just test for a few key lines in the output rather than the
4336        // whole thing to make test maintenance easier.
4337        assert_snapshot!(plan.display_graphviz(), @r#"
4338        // Begin DataFusion GraphViz Plan,
4339        // display it online here: https://dreampuf.github.io/GraphvizOnline
4340
4341        digraph {
4342          subgraph cluster_1
4343          {
4344            graph[label="LogicalPlan"]
4345            2[shape=box label="Projection: employee_csv.id"]
4346            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4347            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4348            4[shape=box label="Subquery:"]
4349            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4350            5[shape=box label="TableScan: employee_csv projection=[state]"]
4351            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4352            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4353            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4354          }
4355          subgraph cluster_7
4356          {
4357            graph[label="Detailed LogicalPlan"]
4358            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4359            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4360            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4361            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4362            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4363            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4364            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4365            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4366            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4367          }
4368        }
4369        // End DataFusion GraphViz Plan
4370        "#);
4371        Ok(())
4372    }
4373
4374    #[test]
4375    fn test_display_pg_json() -> Result<()> {
4376        let plan = display_plan()?;
4377
4378        assert_snapshot!(plan.display_pg_json(), @r#"
4379        [
4380          {
4381            "Plan": {
4382              "Expressions": [
4383                "employee_csv.id"
4384              ],
4385              "Node Type": "Projection",
4386              "Output": [
4387                "id"
4388              ],
4389              "Plans": [
4390                {
4391                  "Condition": "employee_csv.state IN (<subquery>)",
4392                  "Node Type": "Filter",
4393                  "Output": [
4394                    "id",
4395                    "state"
4396                  ],
4397                  "Plans": [
4398                    {
4399                      "Node Type": "Subquery",
4400                      "Output": [
4401                        "state"
4402                      ],
4403                      "Plans": [
4404                        {
4405                          "Node Type": "TableScan",
4406                          "Output": [
4407                            "state"
4408                          ],
4409                          "Plans": [],
4410                          "Relation Name": "employee_csv"
4411                        }
4412                      ]
4413                    },
4414                    {
4415                      "Node Type": "TableScan",
4416                      "Output": [
4417                        "id",
4418                        "state"
4419                      ],
4420                      "Plans": [],
4421                      "Relation Name": "employee_csv"
4422                    }
4423                  ]
4424                }
4425              ]
4426            }
4427          }
4428        ]
4429        "#);
4430        Ok(())
4431    }
4432
4433    /// Tests for the Visitor trait and walking logical plan nodes
4434    #[derive(Debug, Default)]
4435    struct OkVisitor {
4436        strings: Vec<String>,
4437    }
4438
4439    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4440        type Node = LogicalPlan;
4441
4442        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4443            let s = match plan {
4444                LogicalPlan::Projection { .. } => "pre_visit Projection",
4445                LogicalPlan::Filter { .. } => "pre_visit Filter",
4446                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4447                _ => {
4448                    return not_impl_err!("unknown plan type");
4449                }
4450            };
4451
4452            self.strings.push(s.into());
4453            Ok(TreeNodeRecursion::Continue)
4454        }
4455
4456        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4457            let s = match plan {
4458                LogicalPlan::Projection { .. } => "post_visit Projection",
4459                LogicalPlan::Filter { .. } => "post_visit Filter",
4460                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4461                _ => {
4462                    return not_impl_err!("unknown plan type");
4463                }
4464            };
4465
4466            self.strings.push(s.into());
4467            Ok(TreeNodeRecursion::Continue)
4468        }
4469    }
4470
4471    #[test]
4472    fn visit_order() {
4473        let mut visitor = OkVisitor::default();
4474        let plan = test_plan();
4475        let res = plan.visit_with_subqueries(&mut visitor);
4476        assert!(res.is_ok());
4477
4478        assert_debug_snapshot!(visitor.strings, @r#"
4479        [
4480            "pre_visit Projection",
4481            "pre_visit Filter",
4482            "pre_visit TableScan",
4483            "post_visit TableScan",
4484            "post_visit Filter",
4485            "post_visit Projection",
4486        ]
4487        "#);
4488    }
4489
4490    #[derive(Debug, Default)]
4491    /// Counter than counts to zero and returns true when it gets there
4492    struct OptionalCounter {
4493        val: Option<usize>,
4494    }
4495
4496    impl OptionalCounter {
4497        fn new(val: usize) -> Self {
4498            Self { val: Some(val) }
4499        }
4500        // Decrements the counter by 1, if any, returning true if it hits zero
4501        fn dec(&mut self) -> bool {
4502            if Some(0) == self.val {
4503                true
4504            } else {
4505                self.val = self.val.take().map(|i| i - 1);
4506                false
4507            }
4508        }
4509    }
4510
4511    #[derive(Debug, Default)]
4512    /// Visitor that returns false after some number of visits
4513    struct StoppingVisitor {
4514        inner: OkVisitor,
4515        /// When Some(0) returns false from pre_visit
4516        return_false_from_pre_in: OptionalCounter,
4517        /// When Some(0) returns false from post_visit
4518        return_false_from_post_in: OptionalCounter,
4519    }
4520
4521    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4522        type Node = LogicalPlan;
4523
4524        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4525            if self.return_false_from_pre_in.dec() {
4526                return Ok(TreeNodeRecursion::Stop);
4527            }
4528            self.inner.f_down(plan)?;
4529
4530            Ok(TreeNodeRecursion::Continue)
4531        }
4532
4533        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4534            if self.return_false_from_post_in.dec() {
4535                return Ok(TreeNodeRecursion::Stop);
4536            }
4537
4538            self.inner.f_up(plan)
4539        }
4540    }
4541
4542    /// test early stopping in pre-visit
4543    #[test]
4544    fn early_stopping_pre_visit() {
4545        let mut visitor = StoppingVisitor {
4546            return_false_from_pre_in: OptionalCounter::new(2),
4547            ..Default::default()
4548        };
4549        let plan = test_plan();
4550        let res = plan.visit_with_subqueries(&mut visitor);
4551        assert!(res.is_ok());
4552
4553        assert_debug_snapshot!(
4554            visitor.inner.strings,
4555            @r#"
4556        [
4557            "pre_visit Projection",
4558            "pre_visit Filter",
4559        ]
4560        "#
4561        );
4562    }
4563
4564    #[test]
4565    fn early_stopping_post_visit() {
4566        let mut visitor = StoppingVisitor {
4567            return_false_from_post_in: OptionalCounter::new(1),
4568            ..Default::default()
4569        };
4570        let plan = test_plan();
4571        let res = plan.visit_with_subqueries(&mut visitor);
4572        assert!(res.is_ok());
4573
4574        assert_debug_snapshot!(
4575            visitor.inner.strings,
4576            @r#"
4577        [
4578            "pre_visit Projection",
4579            "pre_visit Filter",
4580            "pre_visit TableScan",
4581            "post_visit TableScan",
4582        ]
4583        "#
4584        );
4585    }
4586
4587    #[derive(Debug, Default)]
4588    /// Visitor that returns an error after some number of visits
4589    struct ErrorVisitor {
4590        inner: OkVisitor,
4591        /// When Some(0) returns false from pre_visit
4592        return_error_from_pre_in: OptionalCounter,
4593        /// When Some(0) returns false from post_visit
4594        return_error_from_post_in: OptionalCounter,
4595    }
4596
4597    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4598        type Node = LogicalPlan;
4599
4600        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4601            if self.return_error_from_pre_in.dec() {
4602                return not_impl_err!("Error in pre_visit");
4603            }
4604
4605            self.inner.f_down(plan)
4606        }
4607
4608        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4609            if self.return_error_from_post_in.dec() {
4610                return not_impl_err!("Error in post_visit");
4611            }
4612
4613            self.inner.f_up(plan)
4614        }
4615    }
4616
4617    #[test]
4618    fn error_pre_visit() {
4619        let mut visitor = ErrorVisitor {
4620            return_error_from_pre_in: OptionalCounter::new(2),
4621            ..Default::default()
4622        };
4623        let plan = test_plan();
4624        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4625        assert_snapshot!(
4626            res.strip_backtrace(),
4627            @"This feature is not implemented: Error in pre_visit"
4628        );
4629        assert_debug_snapshot!(
4630            visitor.inner.strings,
4631            @r#"
4632        [
4633            "pre_visit Projection",
4634            "pre_visit Filter",
4635        ]
4636        "#
4637        );
4638    }
4639
4640    #[test]
4641    fn error_post_visit() {
4642        let mut visitor = ErrorVisitor {
4643            return_error_from_post_in: OptionalCounter::new(1),
4644            ..Default::default()
4645        };
4646        let plan = test_plan();
4647        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4648        assert_snapshot!(
4649            res.strip_backtrace(),
4650            @"This feature is not implemented: Error in post_visit"
4651        );
4652        assert_debug_snapshot!(
4653            visitor.inner.strings,
4654            @r#"
4655        [
4656            "pre_visit Projection",
4657            "pre_visit Filter",
4658            "pre_visit TableScan",
4659            "post_visit TableScan",
4660        ]
4661        "#
4662        );
4663    }
4664
4665    #[test]
4666    fn test_partial_eq_hash_and_partial_ord() {
4667        let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4668            produce_one_row: true,
4669            schema: Arc::new(DFSchema::empty()),
4670        }));
4671
4672        let count_window_function = |schema| {
4673            Window::try_new_with_schema(
4674                vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4675                    WindowFunctionDefinition::AggregateUDF(count_udaf()),
4676                    vec![],
4677                )))],
4678                Arc::clone(&empty_values),
4679                Arc::new(schema),
4680            )
4681            .unwrap()
4682        };
4683
4684        let schema_without_metadata = || {
4685            DFSchema::from_unqualified_fields(
4686                vec![Field::new("count", DataType::Int64, false)].into(),
4687                HashMap::new(),
4688            )
4689            .unwrap()
4690        };
4691
4692        let schema_with_metadata = || {
4693            DFSchema::from_unqualified_fields(
4694                vec![Field::new("count", DataType::Int64, false)].into(),
4695                [("key".to_string(), "value".to_string())].into(),
4696            )
4697            .unwrap()
4698        };
4699
4700        // A Window
4701        let f = count_window_function(schema_without_metadata());
4702
4703        // Same like `f`, different instance
4704        let f2 = count_window_function(schema_without_metadata());
4705        assert_eq!(f, f2);
4706        assert_eq!(hash(&f), hash(&f2));
4707        assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4708
4709        // Same like `f`, except for schema metadata
4710        let o = count_window_function(schema_with_metadata());
4711        assert_ne!(f, o);
4712        assert_ne!(hash(&f), hash(&o)); // hash can collide for different values but does not collide in this test
4713        assert_eq!(f.partial_cmp(&o), None);
4714    }
4715
4716    fn hash<T: Hash>(value: &T) -> u64 {
4717        let hasher = &mut DefaultHasher::new();
4718        value.hash(hasher);
4719        hasher.finish()
4720    }
4721
4722    #[test]
4723    fn projection_expr_schema_mismatch() -> Result<()> {
4724        let empty_schema = Arc::new(DFSchema::empty());
4725        let p = Projection::try_new_with_schema(
4726            vec![col("a")],
4727            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4728                produce_one_row: false,
4729                schema: Arc::clone(&empty_schema),
4730            })),
4731            empty_schema,
4732        );
4733        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4734        Ok(())
4735    }
4736
4737    fn test_plan() -> LogicalPlan {
4738        let schema = Schema::new(vec![
4739            Field::new("id", DataType::Int32, false),
4740            Field::new("state", DataType::Utf8, false),
4741        ]);
4742
4743        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4744            .unwrap()
4745            .filter(col("state").eq(lit("CO")))
4746            .unwrap()
4747            .project(vec![col("id")])
4748            .unwrap()
4749            .build()
4750            .unwrap()
4751    }
4752
4753    #[test]
4754    fn test_replace_invalid_placeholder() {
4755        // test empty placeholder
4756        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4757
4758        let plan = table_scan(TableReference::none(), &schema, None)
4759            .unwrap()
4760            .filter(col("id").eq(placeholder("")))
4761            .unwrap()
4762            .build()
4763            .unwrap();
4764
4765        let param_values = vec![ScalarValue::Int32(Some(42))];
4766        plan.replace_params_with_values(&param_values.clone().into())
4767            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4768
4769        // test $0 placeholder
4770        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4771
4772        let plan = table_scan(TableReference::none(), &schema, None)
4773            .unwrap()
4774            .filter(col("id").eq(placeholder("$0")))
4775            .unwrap()
4776            .build()
4777            .unwrap();
4778
4779        plan.replace_params_with_values(&param_values.clone().into())
4780            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4781
4782        // test $00 placeholder
4783        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4784
4785        let plan = table_scan(TableReference::none(), &schema, None)
4786            .unwrap()
4787            .filter(col("id").eq(placeholder("$00")))
4788            .unwrap()
4789            .build()
4790            .unwrap();
4791
4792        plan.replace_params_with_values(&param_values.into())
4793            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4794    }
4795
4796    #[test]
4797    fn test_replace_placeholder_mismatched_metadata() {
4798        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4799
4800        // Create a prepared statement with explicit fields that do not have metadata
4801        let plan = table_scan(TableReference::none(), &schema, None)
4802            .unwrap()
4803            .filter(col("id").eq(placeholder("$1")))
4804            .unwrap()
4805            .build()
4806            .unwrap();
4807        let prepared_builder = LogicalPlanBuilder::new(plan)
4808            .prepare(
4809                "".to_string(),
4810                vec![Field::new("", DataType::Int32, true).into()],
4811            )
4812            .unwrap();
4813
4814        // Attempt to bind a parameter with metadata
4815        let mut scalar_meta = HashMap::new();
4816        scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4817        let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4818            ScalarValue::Int32(Some(42)),
4819            Some(scalar_meta.into()),
4820        )]);
4821        prepared_builder
4822            .plan()
4823            .clone()
4824            .with_param_values(param_values)
4825            .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4826    }
4827
4828    #[test]
4829    fn test_nullable_schema_after_grouping_set() {
4830        let schema = Schema::new(vec![
4831            Field::new("foo", DataType::Int32, false),
4832            Field::new("bar", DataType::Int32, false),
4833        ]);
4834
4835        let plan = table_scan(TableReference::none(), &schema, None)
4836            .unwrap()
4837            .aggregate(
4838                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4839                    vec![col("foo")],
4840                    vec![col("bar")],
4841                ]))],
4842                vec![count(lit(true))],
4843            )
4844            .unwrap()
4845            .build()
4846            .unwrap();
4847
4848        let output_schema = plan.schema();
4849
4850        assert!(output_schema
4851            .field_with_name(None, "foo")
4852            .unwrap()
4853            .is_nullable(),);
4854        assert!(output_schema
4855            .field_with_name(None, "bar")
4856            .unwrap()
4857            .is_nullable());
4858    }
4859
4860    #[test]
4861    fn test_filter_is_scalar() {
4862        // test empty placeholder
4863        let schema =
4864            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4865
4866        let source = Arc::new(LogicalTableSource::new(schema));
4867        let schema = Arc::new(
4868            DFSchema::try_from_qualified_schema(
4869                TableReference::bare("tab"),
4870                &source.schema(),
4871            )
4872            .unwrap(),
4873        );
4874        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4875            table_name: TableReference::bare("tab"),
4876            source: Arc::clone(&source) as Arc<dyn TableSource>,
4877            projection: None,
4878            projected_schema: Arc::clone(&schema),
4879            filters: vec![],
4880            fetch: None,
4881        }));
4882        let col = schema.field_names()[0].clone();
4883
4884        let filter = Filter::try_new(
4885            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4886            scan,
4887        )
4888        .unwrap();
4889        assert!(!filter.is_scalar());
4890        let unique_schema = Arc::new(
4891            schema
4892                .as_ref()
4893                .clone()
4894                .with_functional_dependencies(
4895                    FunctionalDependencies::new_from_constraints(
4896                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4897                            vec![0],
4898                        )])),
4899                        1,
4900                    ),
4901                )
4902                .unwrap(),
4903        );
4904        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4905            table_name: TableReference::bare("tab"),
4906            source,
4907            projection: None,
4908            projected_schema: Arc::clone(&unique_schema),
4909            filters: vec![],
4910            fetch: None,
4911        }));
4912        let col = schema.field_names()[0].clone();
4913
4914        let filter =
4915            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4916        assert!(filter.is_scalar());
4917    }
4918
4919    #[test]
4920    fn test_transform_explain() {
4921        let schema = Schema::new(vec![
4922            Field::new("foo", DataType::Int32, false),
4923            Field::new("bar", DataType::Int32, false),
4924        ]);
4925
4926        let plan = table_scan(TableReference::none(), &schema, None)
4927            .unwrap()
4928            .explain(false, false)
4929            .unwrap()
4930            .build()
4931            .unwrap();
4932
4933        let external_filter = col("foo").eq(lit(true));
4934
4935        // after transformation, because plan is not the same anymore,
4936        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
4937        let plan = plan
4938            .transform(|plan| match plan {
4939                LogicalPlan::TableScan(table) => {
4940                    let filter = Filter::try_new(
4941                        external_filter.clone(),
4942                        Arc::new(LogicalPlan::TableScan(table)),
4943                    )
4944                    .unwrap();
4945                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4946                }
4947                x => Ok(Transformed::no(x)),
4948            })
4949            .data()
4950            .unwrap();
4951
4952        let actual = format!("{}", plan.display_indent());
4953        assert_snapshot!(actual, @r"
4954        Explain
4955          Filter: foo = Boolean(true)
4956            TableScan: ?table?
4957        ")
4958    }
4959
4960    #[test]
4961    fn test_plan_partial_ord() {
4962        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4963            produce_one_row: false,
4964            schema: Arc::new(DFSchema::empty()),
4965        });
4966
4967        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4968            schema: Arc::new(Schema::new(vec![Field::new(
4969                "foo",
4970                DataType::Int32,
4971                false,
4972            )])),
4973            output_schema: DFSchemaRef::new(DFSchema::empty()),
4974        });
4975
4976        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4977            schema: Arc::new(Schema::new(vec![Field::new(
4978                "foo",
4979                DataType::Int32,
4980                false,
4981            )])),
4982            output_schema: DFSchemaRef::new(DFSchema::empty()),
4983        });
4984
4985        assert_eq!(
4986            empty_relation.partial_cmp(&describe_table),
4987            Some(Ordering::Less)
4988        );
4989        assert_eq!(
4990            describe_table.partial_cmp(&empty_relation),
4991            Some(Ordering::Greater)
4992        );
4993        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4994    }
4995
4996    #[test]
4997    fn test_limit_with_new_children() {
4998        let input = Arc::new(LogicalPlan::Values(Values {
4999            schema: Arc::new(DFSchema::empty()),
5000            values: vec![vec![]],
5001        }));
5002        let cases = [
5003            LogicalPlan::Limit(Limit {
5004                skip: None,
5005                fetch: None,
5006                input: Arc::clone(&input),
5007            }),
5008            LogicalPlan::Limit(Limit {
5009                skip: None,
5010                fetch: Some(Box::new(Expr::Literal(
5011                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5012                    None,
5013                ))),
5014                input: Arc::clone(&input),
5015            }),
5016            LogicalPlan::Limit(Limit {
5017                skip: Some(Box::new(Expr::Literal(
5018                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5019                    None,
5020                ))),
5021                fetch: None,
5022                input: Arc::clone(&input),
5023            }),
5024            LogicalPlan::Limit(Limit {
5025                skip: Some(Box::new(Expr::Literal(
5026                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
5027                    None,
5028                ))),
5029                fetch: Some(Box::new(Expr::Literal(
5030                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5031                    None,
5032                ))),
5033                input,
5034            }),
5035        ];
5036
5037        for limit in cases {
5038            let new_limit = limit
5039                .with_new_exprs(
5040                    limit.expressions(),
5041                    limit.inputs().into_iter().cloned().collect(),
5042                )
5043                .unwrap();
5044            assert_eq!(limit, new_limit);
5045        }
5046    }
5047
5048    #[test]
5049    fn test_with_subqueries_jump() {
5050        // The test plan contains a `Project` node above a `Filter` node, and the
5051        // `Project` node contains a subquery plan with a `Filter` root node, so returning
5052        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
5053        // `Filter`s.
5054        let subquery_schema =
5055            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5056
5057        let subquery_plan =
5058            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5059                .unwrap()
5060                .filter(col("sub_id").eq(lit(0)))
5061                .unwrap()
5062                .build()
5063                .unwrap();
5064
5065        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5066
5067        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5068            .unwrap()
5069            .filter(col("id").eq(lit(0)))
5070            .unwrap()
5071            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5072            .unwrap()
5073            .build()
5074            .unwrap();
5075
5076        let mut filter_found = false;
5077        plan.apply_with_subqueries(|plan| {
5078            match plan {
5079                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5080                LogicalPlan::Filter(..) => filter_found = true,
5081                _ => {}
5082            }
5083            Ok(TreeNodeRecursion::Continue)
5084        })
5085        .unwrap();
5086        assert!(!filter_found);
5087
5088        struct ProjectJumpVisitor {
5089            filter_found: bool,
5090        }
5091
5092        impl ProjectJumpVisitor {
5093            fn new() -> Self {
5094                Self {
5095                    filter_found: false,
5096                }
5097            }
5098        }
5099
5100        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5101            type Node = LogicalPlan;
5102
5103            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5104                match node {
5105                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5106                    LogicalPlan::Filter(..) => self.filter_found = true,
5107                    _ => {}
5108                }
5109                Ok(TreeNodeRecursion::Continue)
5110            }
5111        }
5112
5113        let mut visitor = ProjectJumpVisitor::new();
5114        plan.visit_with_subqueries(&mut visitor).unwrap();
5115        assert!(!visitor.filter_found);
5116
5117        let mut filter_found = false;
5118        plan.clone()
5119            .transform_down_with_subqueries(|plan| {
5120                match plan {
5121                    LogicalPlan::Projection(..) => {
5122                        return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5123                    }
5124                    LogicalPlan::Filter(..) => filter_found = true,
5125                    _ => {}
5126                }
5127                Ok(Transformed::no(plan))
5128            })
5129            .unwrap();
5130        assert!(!filter_found);
5131
5132        let mut filter_found = false;
5133        plan.clone()
5134            .transform_down_up_with_subqueries(
5135                |plan| {
5136                    match plan {
5137                        LogicalPlan::Projection(..) => {
5138                            return Ok(Transformed::new(
5139                                plan,
5140                                false,
5141                                TreeNodeRecursion::Jump,
5142                            ))
5143                        }
5144                        LogicalPlan::Filter(..) => filter_found = true,
5145                        _ => {}
5146                    }
5147                    Ok(Transformed::no(plan))
5148                },
5149                |plan| Ok(Transformed::no(plan)),
5150            )
5151            .unwrap();
5152        assert!(!filter_found);
5153
5154        struct ProjectJumpRewriter {
5155            filter_found: bool,
5156        }
5157
5158        impl ProjectJumpRewriter {
5159            fn new() -> Self {
5160                Self {
5161                    filter_found: false,
5162                }
5163            }
5164        }
5165
5166        impl TreeNodeRewriter for ProjectJumpRewriter {
5167            type Node = LogicalPlan;
5168
5169            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5170                match node {
5171                    LogicalPlan::Projection(..) => {
5172                        return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5173                    }
5174                    LogicalPlan::Filter(..) => self.filter_found = true,
5175                    _ => {}
5176                }
5177                Ok(Transformed::no(node))
5178            }
5179        }
5180
5181        let mut rewriter = ProjectJumpRewriter::new();
5182        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5183        assert!(!rewriter.filter_found);
5184    }
5185
5186    #[test]
5187    fn test_with_unresolved_placeholders() {
5188        let field_name = "id";
5189        let placeholder_value = "$1";
5190        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5191
5192        let plan = table_scan(TableReference::none(), &schema, None)
5193            .unwrap()
5194            .filter(col(field_name).eq(placeholder(placeholder_value)))
5195            .unwrap()
5196            .build()
5197            .unwrap();
5198
5199        // Check that the placeholder parameters have not received a DataType.
5200        let params = plan.get_parameter_fields().unwrap();
5201        assert_eq!(params.len(), 1);
5202
5203        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5204        assert_eq!(parameter_type, None);
5205    }
5206
5207    #[test]
5208    fn test_join_with_new_exprs() -> Result<()> {
5209        fn create_test_join(
5210            on: Vec<(Expr, Expr)>,
5211            filter: Option<Expr>,
5212        ) -> Result<LogicalPlan> {
5213            let schema = Schema::new(vec![
5214                Field::new("a", DataType::Int32, false),
5215                Field::new("b", DataType::Int32, false),
5216            ]);
5217
5218            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5219            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5220
5221            Ok(LogicalPlan::Join(Join {
5222                left: Arc::new(
5223                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5224                ),
5225                right: Arc::new(
5226                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5227                ),
5228                on,
5229                filter,
5230                join_type: JoinType::Inner,
5231                join_constraint: JoinConstraint::On,
5232                schema: Arc::new(left_schema.join(&right_schema)?),
5233                null_equality: NullEquality::NullEqualsNothing,
5234            }))
5235        }
5236
5237        {
5238            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5239            let LogicalPlan::Join(join) = join.with_new_exprs(
5240                join.expressions(),
5241                join.inputs().into_iter().cloned().collect(),
5242            )?
5243            else {
5244                unreachable!()
5245            };
5246            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5247            assert_eq!(join.filter, None);
5248        }
5249
5250        {
5251            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5252            let LogicalPlan::Join(join) = join.with_new_exprs(
5253                join.expressions(),
5254                join.inputs().into_iter().cloned().collect(),
5255            )?
5256            else {
5257                unreachable!()
5258            };
5259            assert_eq!(join.on, vec![]);
5260            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5261        }
5262
5263        {
5264            let join = create_test_join(
5265                vec![(col("t1.a"), (col("t2.a")))],
5266                Some(col("t1.b").gt(col("t2.b"))),
5267            )?;
5268            let LogicalPlan::Join(join) = join.with_new_exprs(
5269                join.expressions(),
5270                join.inputs().into_iter().cloned().collect(),
5271            )?
5272            else {
5273                unreachable!()
5274            };
5275            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5276            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5277        }
5278
5279        {
5280            let join = create_test_join(
5281                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5282                None,
5283            )?;
5284            let LogicalPlan::Join(join) = join.with_new_exprs(
5285                vec![
5286                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5287                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5288                    col("t1.b"),
5289                    col("t2.b"),
5290                    lit(true),
5291                ],
5292                join.inputs().into_iter().cloned().collect(),
5293            )?
5294            else {
5295                unreachable!()
5296            };
5297            assert_eq!(
5298                join.on,
5299                vec![
5300                    (
5301                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5302                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5303                    ),
5304                    (col("t1.b"), (col("t2.b")))
5305                ]
5306            );
5307            assert_eq!(join.filter, Some(lit(true)));
5308        }
5309
5310        Ok(())
5311    }
5312
5313    #[test]
5314    fn test_join_try_new() -> Result<()> {
5315        let schema = Schema::new(vec![
5316            Field::new("a", DataType::Int32, false),
5317            Field::new("b", DataType::Int32, false),
5318        ]);
5319
5320        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5321
5322        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5323
5324        let join_types = vec![
5325            JoinType::Inner,
5326            JoinType::Left,
5327            JoinType::Right,
5328            JoinType::Full,
5329            JoinType::LeftSemi,
5330            JoinType::LeftAnti,
5331            JoinType::RightSemi,
5332            JoinType::RightAnti,
5333            JoinType::LeftMark,
5334        ];
5335
5336        for join_type in join_types {
5337            let join = Join::try_new(
5338                Arc::new(left_scan.clone()),
5339                Arc::new(right_scan.clone()),
5340                vec![(col("t1.a"), col("t2.a"))],
5341                Some(col("t1.b").gt(col("t2.b"))),
5342                join_type,
5343                JoinConstraint::On,
5344                NullEquality::NullEqualsNothing,
5345            )?;
5346
5347            match join_type {
5348                JoinType::LeftSemi | JoinType::LeftAnti => {
5349                    assert_eq!(join.schema.fields().len(), 2);
5350
5351                    let fields = join.schema.fields();
5352                    assert_eq!(
5353                        fields[0].name(),
5354                        "a",
5355                        "First field should be 'a' from left table"
5356                    );
5357                    assert_eq!(
5358                        fields[1].name(),
5359                        "b",
5360                        "Second field should be 'b' from left table"
5361                    );
5362                }
5363                JoinType::RightSemi | JoinType::RightAnti => {
5364                    assert_eq!(join.schema.fields().len(), 2);
5365
5366                    let fields = join.schema.fields();
5367                    assert_eq!(
5368                        fields[0].name(),
5369                        "a",
5370                        "First field should be 'a' from right table"
5371                    );
5372                    assert_eq!(
5373                        fields[1].name(),
5374                        "b",
5375                        "Second field should be 'b' from right table"
5376                    );
5377                }
5378                JoinType::LeftMark => {
5379                    assert_eq!(join.schema.fields().len(), 3);
5380
5381                    let fields = join.schema.fields();
5382                    assert_eq!(
5383                        fields[0].name(),
5384                        "a",
5385                        "First field should be 'a' from left table"
5386                    );
5387                    assert_eq!(
5388                        fields[1].name(),
5389                        "b",
5390                        "Second field should be 'b' from left table"
5391                    );
5392                    assert_eq!(
5393                        fields[2].name(),
5394                        "mark",
5395                        "Third field should be the mark column"
5396                    );
5397
5398                    assert!(!fields[0].is_nullable());
5399                    assert!(!fields[1].is_nullable());
5400                    assert!(!fields[2].is_nullable());
5401                }
5402                _ => {
5403                    assert_eq!(join.schema.fields().len(), 4);
5404
5405                    let fields = join.schema.fields();
5406                    assert_eq!(
5407                        fields[0].name(),
5408                        "a",
5409                        "First field should be 'a' from left table"
5410                    );
5411                    assert_eq!(
5412                        fields[1].name(),
5413                        "b",
5414                        "Second field should be 'b' from left table"
5415                    );
5416                    assert_eq!(
5417                        fields[2].name(),
5418                        "a",
5419                        "Third field should be 'a' from right table"
5420                    );
5421                    assert_eq!(
5422                        fields[3].name(),
5423                        "b",
5424                        "Fourth field should be 'b' from right table"
5425                    );
5426
5427                    if join_type == JoinType::Left {
5428                        // Left side fields (first two) shouldn't be nullable
5429                        assert!(!fields[0].is_nullable());
5430                        assert!(!fields[1].is_nullable());
5431                        // Right side fields (third and fourth) should be nullable
5432                        assert!(fields[2].is_nullable());
5433                        assert!(fields[3].is_nullable());
5434                    } else if join_type == JoinType::Right {
5435                        // Left side fields (first two) should be nullable
5436                        assert!(fields[0].is_nullable());
5437                        assert!(fields[1].is_nullable());
5438                        // Right side fields (third and fourth) shouldn't be nullable
5439                        assert!(!fields[2].is_nullable());
5440                        assert!(!fields[3].is_nullable());
5441                    } else if join_type == JoinType::Full {
5442                        assert!(fields[0].is_nullable());
5443                        assert!(fields[1].is_nullable());
5444                        assert!(fields[2].is_nullable());
5445                        assert!(fields[3].is_nullable());
5446                    }
5447                }
5448            }
5449
5450            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5451            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5452            assert_eq!(join.join_type, join_type);
5453            assert_eq!(join.join_constraint, JoinConstraint::On);
5454            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5455        }
5456
5457        Ok(())
5458    }
5459
5460    #[test]
5461    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5462        let left_schema = Schema::new(vec![
5463            Field::new("id", DataType::Int32, false), // Common column in both tables
5464            Field::new("name", DataType::Utf8, false), // Unique to left
5465            Field::new("value", DataType::Int32, false), // Common column, different meaning
5466        ]);
5467
5468        let right_schema = Schema::new(vec![
5469            Field::new("id", DataType::Int32, false), // Common column in both tables
5470            Field::new("category", DataType::Utf8, false), // Unique to right
5471            Field::new("value", DataType::Float64, true), // Common column, different meaning
5472        ]);
5473
5474        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5475
5476        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5477
5478        // Test 1: USING constraint with a common column
5479        {
5480            // In the logical plan, both copies of the `id` column are preserved
5481            // The USING constraint is handled later during physical execution, where the common column appears once
5482            let join = Join::try_new(
5483                Arc::new(left_plan.clone()),
5484                Arc::new(right_plan.clone()),
5485                vec![(col("t1.id"), col("t2.id"))],
5486                None,
5487                JoinType::Inner,
5488                JoinConstraint::Using,
5489                NullEquality::NullEqualsNothing,
5490            )?;
5491
5492            let fields = join.schema.fields();
5493
5494            assert_eq!(fields.len(), 6);
5495
5496            assert_eq!(
5497                fields[0].name(),
5498                "id",
5499                "First field should be 'id' from left table"
5500            );
5501            assert_eq!(
5502                fields[1].name(),
5503                "name",
5504                "Second field should be 'name' from left table"
5505            );
5506            assert_eq!(
5507                fields[2].name(),
5508                "value",
5509                "Third field should be 'value' from left table"
5510            );
5511            assert_eq!(
5512                fields[3].name(),
5513                "id",
5514                "Fourth field should be 'id' from right table"
5515            );
5516            assert_eq!(
5517                fields[4].name(),
5518                "category",
5519                "Fifth field should be 'category' from right table"
5520            );
5521            assert_eq!(
5522                fields[5].name(),
5523                "value",
5524                "Sixth field should be 'value' from right table"
5525            );
5526
5527            assert_eq!(join.join_constraint, JoinConstraint::Using);
5528        }
5529
5530        // Test 2: Complex join condition with expressions
5531        {
5532            // Complex condition: join on id equality AND where left.value < right.value
5533            let join = Join::try_new(
5534                Arc::new(left_plan.clone()),
5535                Arc::new(right_plan.clone()),
5536                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5537                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5538                JoinType::Inner,
5539                JoinConstraint::On,
5540                NullEquality::NullEqualsNothing,
5541            )?;
5542
5543            let fields = join.schema.fields();
5544            assert_eq!(fields.len(), 6);
5545
5546            assert_eq!(
5547                fields[0].name(),
5548                "id",
5549                "First field should be 'id' from left table"
5550            );
5551            assert_eq!(
5552                fields[1].name(),
5553                "name",
5554                "Second field should be 'name' from left table"
5555            );
5556            assert_eq!(
5557                fields[2].name(),
5558                "value",
5559                "Third field should be 'value' from left table"
5560            );
5561            assert_eq!(
5562                fields[3].name(),
5563                "id",
5564                "Fourth field should be 'id' from right table"
5565            );
5566            assert_eq!(
5567                fields[4].name(),
5568                "category",
5569                "Fifth field should be 'category' from right table"
5570            );
5571            assert_eq!(
5572                fields[5].name(),
5573                "value",
5574                "Sixth field should be 'value' from right table"
5575            );
5576
5577            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5578        }
5579
5580        // Test 3: Join with null equality behavior set to true
5581        {
5582            let join = Join::try_new(
5583                Arc::new(left_plan.clone()),
5584                Arc::new(right_plan.clone()),
5585                vec![(col("t1.id"), col("t2.id"))],
5586                None,
5587                JoinType::Inner,
5588                JoinConstraint::On,
5589                NullEquality::NullEqualsNull,
5590            )?;
5591
5592            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5593        }
5594
5595        Ok(())
5596    }
5597
5598    #[test]
5599    fn test_join_try_new_schema_validation() -> Result<()> {
5600        let left_schema = Schema::new(vec![
5601            Field::new("id", DataType::Int32, false),
5602            Field::new("name", DataType::Utf8, false),
5603            Field::new("value", DataType::Float64, true),
5604        ]);
5605
5606        let right_schema = Schema::new(vec![
5607            Field::new("id", DataType::Int32, false),
5608            Field::new("category", DataType::Utf8, true),
5609            Field::new("code", DataType::Int16, false),
5610        ]);
5611
5612        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5613
5614        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5615
5616        let join_types = vec![
5617            JoinType::Inner,
5618            JoinType::Left,
5619            JoinType::Right,
5620            JoinType::Full,
5621        ];
5622
5623        for join_type in join_types {
5624            let join = Join::try_new(
5625                Arc::new(left_plan.clone()),
5626                Arc::new(right_plan.clone()),
5627                vec![(col("t1.id"), col("t2.id"))],
5628                Some(col("t1.value").gt(lit(5.0))),
5629                join_type,
5630                JoinConstraint::On,
5631                NullEquality::NullEqualsNothing,
5632            )?;
5633
5634            let fields = join.schema.fields();
5635            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5636
5637            for (i, field) in fields.iter().enumerate() {
5638                let expected_nullable = match (i, &join_type) {
5639                    // Left table fields (indices 0, 1, 2)
5640                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5641                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5642                    (2, _) => true, // value is already nullable
5643
5644                    // Right table fields (indices 3, 4, 5)
5645                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5646                    (4, _) => true, // category is already nullable
5647                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5648
5649                    _ => false,
5650                };
5651
5652                assert_eq!(
5653                    field.is_nullable(),
5654                    expected_nullable,
5655                    "Field {} ({}) nullability incorrect for {:?} join",
5656                    i,
5657                    field.name(),
5658                    join_type
5659                );
5660            }
5661        }
5662
5663        let using_join = Join::try_new(
5664            Arc::new(left_plan.clone()),
5665            Arc::new(right_plan.clone()),
5666            vec![(col("t1.id"), col("t2.id"))],
5667            None,
5668            JoinType::Inner,
5669            JoinConstraint::Using,
5670            NullEquality::NullEqualsNothing,
5671        )?;
5672
5673        assert_eq!(
5674            using_join.schema.fields().len(),
5675            6,
5676            "USING join should have all fields"
5677        );
5678        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5679
5680        Ok(())
5681    }
5682}