datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29    assert_always_invariants_at_current_node, assert_executable_invariants,
30    InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
35use crate::expr_rewriter::{
36    create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
37};
38use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
39use crate::logical_plan::extension::UserDefinedLogicalNode;
40use crate::logical_plan::{DmlStatement, Statement};
41use crate::utils::{
42    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
43    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
44};
45use crate::{
46    build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
47    CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
48    Operator, Prepare, TableProviderFilterPushDown, TableSource,
49    WindowFunctionDefinition,
50};
51
52use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
53use datafusion_common::cse::{NormalizeEq, Normalizeable};
54use datafusion_common::tree_node::{
55    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
56};
57use datafusion_common::{
58    aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
59    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
60    FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
61    TableReference, UnnestOptions,
62};
63use indexmap::IndexSet;
64
65// backwards compatibility
66use crate::display::PgJsonVisitor;
67pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
68pub use datafusion_common::{JoinConstraint, JoinType};
69
70/// A `LogicalPlan` is a node in a tree of relational operators (such as
71/// Projection or Filter).
72///
73/// Represents transforming an input relation (table) to an output relation
74/// (table) with a potentially different schema. Plans form a dataflow tree
75/// where data flows from leaves up to the root to produce the query result.
76///
77/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
78/// or programmatically (for example custom query languages).
79///
80/// # See also:
81/// * [`Expr`]: For the expressions that are evaluated by the plan
82/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
83/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
84///
85/// [`tree_node`]: crate::logical_plan::tree_node
86///
87/// # Examples
88///
89/// ## Creating a LogicalPlan from SQL:
90///
91/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
92///
93/// ## Creating a LogicalPlan from the DataFrame API:
94///
95/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
96///
97/// ## Creating a LogicalPlan programmatically:
98///
99/// See [`LogicalPlanBuilder`]
100///
101/// # Visiting and Rewriting `LogicalPlan`s
102///
103/// Using the [`tree_node`] API, you can recursively walk all nodes in a
104/// `LogicalPlan`. For example, to find all column references in a plan:
105///
106/// ```
107/// # use std::collections::HashSet;
108/// # use arrow::datatypes::{DataType, Field, Schema};
109/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
110/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
111/// # use datafusion_common::{Column, Result};
112/// # fn employee_schema() -> Schema {
113/// #    Schema::new(vec![
114/// #           Field::new("name", DataType::Utf8, false),
115/// #           Field::new("salary", DataType::Int32, false),
116/// #       ])
117/// #   }
118/// // Projection(name, salary)
119/// //   Filter(salary > 1000)
120/// //     TableScan(employee)
121/// # fn main() -> Result<()> {
122/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
123///  .filter(col("salary").gt(lit(1000)))?
124///  .project(vec![col("name")])?
125///  .build()?;
126///
127/// // use apply to walk the plan and collect all expressions
128/// let mut expressions = HashSet::new();
129/// plan.apply(|node| {
130///   // collect all expressions in the plan
131///   node.apply_expressions(|expr| {
132///    expressions.insert(expr.clone());
133///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
134///   })?;
135///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
136/// }).unwrap();
137///
138/// // we found the expression in projection and filter
139/// assert_eq!(expressions.len(), 2);
140/// println!("Found expressions: {:?}", expressions);
141/// // found predicate in the Filter: employee.salary > 1000
142/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
143/// assert!(expressions.contains(&salary.gt(lit(1000))));
144/// // found projection in the Projection: employee.name
145/// let name = Expr::Column(Column::new(Some("employee"), "name"));
146/// assert!(expressions.contains(&name));
147/// # Ok(())
148/// # }
149/// ```
150///
151/// You can also rewrite plans using the [`tree_node`] API. For example, to
152/// replace the filter predicate in a plan:
153///
154/// ```
155/// # use std::collections::HashSet;
156/// # use arrow::datatypes::{DataType, Field, Schema};
157/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
158/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
159/// # use datafusion_common::{Column, Result};
160/// # fn employee_schema() -> Schema {
161/// #    Schema::new(vec![
162/// #           Field::new("name", DataType::Utf8, false),
163/// #           Field::new("salary", DataType::Int32, false),
164/// #       ])
165/// #   }
166/// // Projection(name, salary)
167/// //   Filter(salary > 1000)
168/// //     TableScan(employee)
169/// # fn main() -> Result<()> {
170/// use datafusion_common::tree_node::Transformed;
171/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
172///  .filter(col("salary").gt(lit(1000)))?
173///  .project(vec![col("name")])?
174///  .build()?;
175///
176/// // use transform to rewrite the plan
177/// let transformed_result = plan.transform(|node| {
178///   // when we see the filter node
179///   if let LogicalPlan::Filter(mut filter) = node {
180///     // replace predicate with salary < 2000
181///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
182///     let new_plan = LogicalPlan::Filter(filter);
183///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
184///   }
185///   // return the node unchanged
186///   Ok(Transformed::no(node))
187/// }).unwrap();
188///
189/// // Transformed result contains rewritten plan and information about
190/// // whether the plan was changed
191/// assert!(transformed_result.transformed);
192/// let rewritten_plan = transformed_result.data;
193///
194/// // we found the filter
195/// assert_eq!(rewritten_plan.display_indent().to_string(),
196/// "Projection: employee.name\
197/// \n  Filter: employee.salary < Int32(2000)\
198/// \n    TableScan: employee");
199/// # Ok(())
200/// # }
201/// ```
202///
203#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
204pub enum LogicalPlan {
205    /// Evaluates an arbitrary list of expressions (essentially a
206    /// SELECT with an expression list) on its input.
207    Projection(Projection),
208    /// Filters rows from its input that do not match an
209    /// expression (essentially a WHERE clause with a predicate
210    /// expression).
211    ///
212    /// Semantically, `<predicate>` is evaluated for each row of the
213    /// input; If the value of `<predicate>` is true, the input row is
214    /// passed to the output. If the value of `<predicate>` is false
215    /// (or null), the row is discarded.
216    Filter(Filter),
217    /// Windows input based on a set of window spec and window
218    /// function (e.g. SUM or RANK).  This is used to implement SQL
219    /// window functions, and the `OVER` clause.
220    ///
221    /// See [`Window`] for more details
222    Window(Window),
223    /// Aggregates its input based on a set of grouping and aggregate
224    /// expressions (e.g. SUM). This is used to implement SQL aggregates
225    /// and `GROUP BY`.
226    ///
227    /// See [`Aggregate`] for more details
228    Aggregate(Aggregate),
229    /// Sorts its input according to a list of sort expressions. This
230    /// is used to implement SQL `ORDER BY`
231    Sort(Sort),
232    /// Join two logical plans on one or more join columns.
233    /// This is used to implement SQL `JOIN`
234    Join(Join),
235    /// Repartitions the input based on a partitioning scheme. This is
236    /// used to add parallelism and is sometimes referred to as an
237    /// "exchange" operator in other systems
238    Repartition(Repartition),
239    /// Union multiple inputs with the same schema into a single
240    /// output stream. This is used to implement SQL `UNION [ALL]` and
241    /// `INTERSECT [ALL]`.
242    Union(Union),
243    /// Produces rows from a [`TableSource`], used to implement SQL
244    /// `FROM` tables or views.
245    TableScan(TableScan),
246    /// Produces no rows: An empty relation with an empty schema that
247    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
248    /// that has no values in the `FROM` clause.
249    EmptyRelation(EmptyRelation),
250    /// Produces the output of running another query.  This is used to
251    /// implement SQL subqueries
252    Subquery(Subquery),
253    /// Aliased relation provides, or changes, the name of a relation.
254    SubqueryAlias(SubqueryAlias),
255    /// Skip some number of rows, and then fetch some number of rows.
256    Limit(Limit),
257    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
258    Statement(Statement),
259    /// Values expression. See
260    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
261    /// documentation for more details. This is used to implement SQL such as
262    /// `VALUES (1, 2), (3, 4)`
263    Values(Values),
264    /// Produces a relation with string representations of
265    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
266    Explain(Explain),
267    /// Runs the input, and prints annotated physical plan as a string
268    /// with execution metric. This is used to implement SQL
269    /// `EXPLAIN ANALYZE`.
270    Analyze(Analyze),
271    /// Extension operator defined outside of DataFusion. This is used
272    /// to extend DataFusion with custom relational operations that
273    Extension(Extension),
274    /// Remove duplicate rows from the input. This is used to
275    /// implement SQL `SELECT DISTINCT ...`.
276    Distinct(Distinct),
277    /// Data Manipulation Language (DML): Insert / Update / Delete
278    Dml(DmlStatement),
279    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
280    Ddl(DdlStatement),
281    /// `COPY TO` for writing plan results to files
282    Copy(CopyTo),
283    /// Describe the schema of the table. This is used to implement the
284    /// SQL `DESCRIBE` command from MySQL.
285    DescribeTable(DescribeTable),
286    /// Unnest a column that contains a nested list type such as an
287    /// ARRAY. This is used to implement SQL `UNNEST`
288    Unnest(Unnest),
289    /// A variadic query (e.g. "Recursive CTEs")
290    RecursiveQuery(RecursiveQuery),
291}
292
293impl Default for LogicalPlan {
294    fn default() -> Self {
295        LogicalPlan::EmptyRelation(EmptyRelation {
296            produce_one_row: false,
297            schema: Arc::new(DFSchema::empty()),
298        })
299    }
300}
301
302impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
303    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
304        &'a self,
305        mut f: F,
306    ) -> Result<TreeNodeRecursion> {
307        f(self)
308    }
309
310    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
311        self,
312        mut f: F,
313    ) -> Result<Transformed<Self>> {
314        f(self)
315    }
316}
317
318impl LogicalPlan {
319    /// Get a reference to the logical plan's schema
320    pub fn schema(&self) -> &DFSchemaRef {
321        match self {
322            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
323            LogicalPlan::Values(Values { schema, .. }) => schema,
324            LogicalPlan::TableScan(TableScan {
325                projected_schema, ..
326            }) => projected_schema,
327            LogicalPlan::Projection(Projection { schema, .. }) => schema,
328            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
329            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
330            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
331            LogicalPlan::Window(Window { schema, .. }) => schema,
332            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
333            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
334            LogicalPlan::Join(Join { schema, .. }) => schema,
335            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
336            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
337            LogicalPlan::Statement(statement) => statement.schema(),
338            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
339            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
340            LogicalPlan::Explain(explain) => &explain.schema,
341            LogicalPlan::Analyze(analyze) => &analyze.schema,
342            LogicalPlan::Extension(extension) => extension.node.schema(),
343            LogicalPlan::Union(Union { schema, .. }) => schema,
344            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
345                output_schema
346            }
347            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
348            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
349            LogicalPlan::Ddl(ddl) => ddl.schema(),
350            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
351            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
352                // we take the schema of the static term as the schema of the entire recursive query
353                static_term.schema()
354            }
355        }
356    }
357
358    /// Used for normalizing columns, as the fallback schemas to the main schema
359    /// of the plan.
360    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
361        match self {
362            LogicalPlan::Window(_)
363            | LogicalPlan::Projection(_)
364            | LogicalPlan::Aggregate(_)
365            | LogicalPlan::Unnest(_)
366            | LogicalPlan::Join(_) => self
367                .inputs()
368                .iter()
369                .map(|input| input.schema().as_ref())
370                .collect(),
371            _ => vec![],
372        }
373    }
374
375    /// Returns the (fixed) output schema for explain plans
376    pub fn explain_schema() -> SchemaRef {
377        SchemaRef::new(Schema::new(vec![
378            Field::new("plan_type", DataType::Utf8, false),
379            Field::new("plan", DataType::Utf8, false),
380        ]))
381    }
382
383    /// Returns the (fixed) output schema for `DESCRIBE` plans
384    pub fn describe_schema() -> Schema {
385        Schema::new(vec![
386            Field::new("column_name", DataType::Utf8, false),
387            Field::new("data_type", DataType::Utf8, false),
388            Field::new("is_nullable", DataType::Utf8, false),
389        ])
390    }
391
392    /// Returns all expressions (non-recursively) evaluated by the current
393    /// logical plan node. This does not include expressions in any children.
394    ///
395    /// Note this method `clone`s all the expressions. When possible, the
396    /// [`tree_node`] API should be used instead of this API.
397    ///
398    /// The returned expressions do not necessarily represent or even
399    /// contributed to the output schema of this node. For example,
400    /// `LogicalPlan::Filter` returns the filter expression even though the
401    /// output of a Filter has the same columns as the input.
402    ///
403    /// The expressions do contain all the columns that are used by this plan,
404    /// so if there are columns not referenced by these expressions then
405    /// DataFusion's optimizer attempts to optimize them away.
406    ///
407    /// [`tree_node`]: crate::logical_plan::tree_node
408    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
409        let mut exprs = vec![];
410        self.apply_expressions(|e| {
411            exprs.push(e.clone());
412            Ok(TreeNodeRecursion::Continue)
413        })
414        // closure always returns OK
415        .unwrap();
416        exprs
417    }
418
419    /// Returns all the out reference(correlated) expressions (recursively) in the current
420    /// logical plan nodes and all its descendant nodes.
421    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
422        let mut exprs = vec![];
423        self.apply_expressions(|e| {
424            find_out_reference_exprs(e).into_iter().for_each(|e| {
425                if !exprs.contains(&e) {
426                    exprs.push(e)
427                }
428            });
429            Ok(TreeNodeRecursion::Continue)
430        })
431        // closure always returns OK
432        .unwrap();
433        self.inputs()
434            .into_iter()
435            .flat_map(|child| child.all_out_ref_exprs())
436            .for_each(|e| {
437                if !exprs.contains(&e) {
438                    exprs.push(e)
439                }
440            });
441        exprs
442    }
443
444    /// Returns all inputs / children of this `LogicalPlan` node.
445    ///
446    /// Note does not include inputs to inputs, or subqueries.
447    pub fn inputs(&self) -> Vec<&LogicalPlan> {
448        match self {
449            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
450            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
451            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
452            LogicalPlan::Window(Window { input, .. }) => vec![input],
453            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
454            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
455            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
456            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
457            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
458            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
459            LogicalPlan::Extension(extension) => extension.node.inputs(),
460            LogicalPlan::Union(Union { inputs, .. }) => {
461                inputs.iter().map(|arc| arc.as_ref()).collect()
462            }
463            LogicalPlan::Distinct(
464                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
465            ) => vec![input],
466            LogicalPlan::Explain(explain) => vec![&explain.plan],
467            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
468            LogicalPlan::Dml(write) => vec![&write.input],
469            LogicalPlan::Copy(copy) => vec![&copy.input],
470            LogicalPlan::Ddl(ddl) => ddl.inputs(),
471            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
472            LogicalPlan::RecursiveQuery(RecursiveQuery {
473                static_term,
474                recursive_term,
475                ..
476            }) => vec![static_term, recursive_term],
477            LogicalPlan::Statement(stmt) => stmt.inputs(),
478            // plans without inputs
479            LogicalPlan::TableScan { .. }
480            | LogicalPlan::EmptyRelation { .. }
481            | LogicalPlan::Values { .. }
482            | LogicalPlan::DescribeTable(_) => vec![],
483        }
484    }
485
486    /// returns all `Using` join columns in a logical plan
487    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
488        let mut using_columns: Vec<HashSet<Column>> = vec![];
489
490        self.apply_with_subqueries(|plan| {
491            if let LogicalPlan::Join(Join {
492                join_constraint: JoinConstraint::Using,
493                on,
494                ..
495            }) = plan
496            {
497                // The join keys in using-join must be columns.
498                let columns =
499                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
500                        let Some(l) = l.get_as_join_column() else {
501                            return internal_err!(
502                                "Invalid join key. Expected column, found {l:?}"
503                            );
504                        };
505                        let Some(r) = r.get_as_join_column() else {
506                            return internal_err!(
507                                "Invalid join key. Expected column, found {r:?}"
508                            );
509                        };
510                        accumu.insert(l.to_owned());
511                        accumu.insert(r.to_owned());
512                        Result::<_, DataFusionError>::Ok(accumu)
513                    })?;
514                using_columns.push(columns);
515            }
516            Ok(TreeNodeRecursion::Continue)
517        })?;
518
519        Ok(using_columns)
520    }
521
522    /// returns the first output expression of this `LogicalPlan` node.
523    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
524        match self {
525            LogicalPlan::Projection(projection) => {
526                Ok(Some(projection.expr.as_slice()[0].clone()))
527            }
528            LogicalPlan::Aggregate(agg) => {
529                if agg.group_expr.is_empty() {
530                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
531                } else {
532                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
533                }
534            }
535            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
536                Ok(Some(select_expr[0].clone()))
537            }
538            LogicalPlan::Filter(Filter { input, .. })
539            | LogicalPlan::Distinct(Distinct::All(input))
540            | LogicalPlan::Sort(Sort { input, .. })
541            | LogicalPlan::Limit(Limit { input, .. })
542            | LogicalPlan::Repartition(Repartition { input, .. })
543            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
544            LogicalPlan::Join(Join {
545                left,
546                right,
547                join_type,
548                ..
549            }) => match join_type {
550                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
551                    if left.schema().fields().is_empty() {
552                        right.head_output_expr()
553                    } else {
554                        left.head_output_expr()
555                    }
556                }
557                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
558                    left.head_output_expr()
559                }
560                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
561                    right.head_output_expr()
562                }
563            },
564            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
565                static_term.head_output_expr()
566            }
567            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
568                union.schema.qualified_field(0),
569            )))),
570            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
571                table.projected_schema.qualified_field(0),
572            )))),
573            LogicalPlan::SubqueryAlias(subquery_alias) => {
574                let expr_opt = subquery_alias.input.head_output_expr()?;
575                expr_opt
576                    .map(|expr| {
577                        Ok(Expr::Column(create_col_from_scalar_expr(
578                            &expr,
579                            subquery_alias.alias.to_string(),
580                        )?))
581                    })
582                    .map_or(Ok(None), |v| v.map(Some))
583            }
584            LogicalPlan::Subquery(_) => Ok(None),
585            LogicalPlan::EmptyRelation(_)
586            | LogicalPlan::Statement(_)
587            | LogicalPlan::Values(_)
588            | LogicalPlan::Explain(_)
589            | LogicalPlan::Analyze(_)
590            | LogicalPlan::Extension(_)
591            | LogicalPlan::Dml(_)
592            | LogicalPlan::Copy(_)
593            | LogicalPlan::Ddl(_)
594            | LogicalPlan::DescribeTable(_)
595            | LogicalPlan::Unnest(_) => Ok(None),
596        }
597    }
598
599    /// Recomputes schema and type information for this LogicalPlan if needed.
600    ///
601    /// Some `LogicalPlan`s may need to recompute their schema if the number or
602    /// type of expressions have been changed (for example due to type
603    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
604    /// its expressions.
605    ///
606    /// Some `LogicalPlan`s schema is unaffected by any changes to their
607    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
608    /// same as its input schema.
609    ///
610    /// This is useful after modifying a plans `Expr`s (or input plans) via
611    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
612    /// [Self::with_new_exprs], this method does not require a new set of
613    /// expressions or inputs plans.
614    ///
615    /// # Return value
616    /// Returns an error if there is some issue recomputing the schema.
617    ///
618    /// # Notes
619    ///
620    /// * Does not recursively recompute schema for input (child) plans.
621    pub fn recompute_schema(self) -> Result<Self> {
622        match self {
623            // Since expr may be different than the previous expr, schema of the projection
624            // may change. We need to use try_new method instead of try_new_with_schema method.
625            LogicalPlan::Projection(Projection {
626                expr,
627                input,
628                schema: _,
629            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
630            LogicalPlan::Dml(_) => Ok(self),
631            LogicalPlan::Copy(_) => Ok(self),
632            LogicalPlan::Values(Values { schema, values }) => {
633                // todo it isn't clear why the schema is not recomputed here
634                Ok(LogicalPlan::Values(Values { schema, values }))
635            }
636            LogicalPlan::Filter(Filter { predicate, input }) => {
637                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
638            }
639            LogicalPlan::Repartition(_) => Ok(self),
640            LogicalPlan::Window(Window {
641                input,
642                window_expr,
643                schema: _,
644            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
645            LogicalPlan::Aggregate(Aggregate {
646                input,
647                group_expr,
648                aggr_expr,
649                schema: _,
650            }) => Aggregate::try_new(input, group_expr, aggr_expr)
651                .map(LogicalPlan::Aggregate),
652            LogicalPlan::Sort(_) => Ok(self),
653            LogicalPlan::Join(Join {
654                left,
655                right,
656                filter,
657                join_type,
658                join_constraint,
659                on,
660                schema: _,
661                null_equality,
662            }) => {
663                let schema =
664                    build_join_schema(left.schema(), right.schema(), &join_type)?;
665
666                let new_on: Vec<_> = on
667                    .into_iter()
668                    .map(|equi_expr| {
669                        // SimplifyExpression rule may add alias to the equi_expr.
670                        (equi_expr.0.unalias(), equi_expr.1.unalias())
671                    })
672                    .collect();
673
674                Ok(LogicalPlan::Join(Join {
675                    left,
676                    right,
677                    join_type,
678                    join_constraint,
679                    on: new_on,
680                    filter,
681                    schema: DFSchemaRef::new(schema),
682                    null_equality,
683                }))
684            }
685            LogicalPlan::Subquery(_) => Ok(self),
686            LogicalPlan::SubqueryAlias(SubqueryAlias {
687                input,
688                alias,
689                schema: _,
690            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
691            LogicalPlan::Limit(_) => Ok(self),
692            LogicalPlan::Ddl(_) => Ok(self),
693            LogicalPlan::Extension(Extension { node }) => {
694                // todo make an API that does not require cloning
695                // This requires a copy of the extension nodes expressions and inputs
696                let expr = node.expressions();
697                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
698                Ok(LogicalPlan::Extension(Extension {
699                    node: node.with_exprs_and_inputs(expr, inputs)?,
700                }))
701            }
702            LogicalPlan::Union(Union { inputs, schema }) => {
703                let first_input_schema = inputs[0].schema();
704                if schema.fields().len() == first_input_schema.fields().len() {
705                    // If inputs are not pruned do not change schema
706                    Ok(LogicalPlan::Union(Union { inputs, schema }))
707                } else {
708                    // A note on `Union`s constructed via `try_new_by_name`:
709                    //
710                    // At this point, the schema for each input should have
711                    // the same width. Thus, we do not need to save whether a
712                    // `Union` was created `BY NAME`, and can safely rely on the
713                    // `try_new` initializer to derive the new schema based on
714                    // column positions.
715                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
716                }
717            }
718            LogicalPlan::Distinct(distinct) => {
719                let distinct = match distinct {
720                    Distinct::All(input) => Distinct::All(input),
721                    Distinct::On(DistinctOn {
722                        on_expr,
723                        select_expr,
724                        sort_expr,
725                        input,
726                        schema: _,
727                    }) => Distinct::On(DistinctOn::try_new(
728                        on_expr,
729                        select_expr,
730                        sort_expr,
731                        input,
732                    )?),
733                };
734                Ok(LogicalPlan::Distinct(distinct))
735            }
736            LogicalPlan::RecursiveQuery(_) => Ok(self),
737            LogicalPlan::Analyze(_) => Ok(self),
738            LogicalPlan::Explain(_) => Ok(self),
739            LogicalPlan::TableScan(_) => Ok(self),
740            LogicalPlan::EmptyRelation(_) => Ok(self),
741            LogicalPlan::Statement(_) => Ok(self),
742            LogicalPlan::DescribeTable(_) => Ok(self),
743            LogicalPlan::Unnest(Unnest {
744                input,
745                exec_columns,
746                options,
747                ..
748            }) => {
749                // Update schema with unnested column type.
750                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
751            }
752        }
753    }
754
755    /// Returns a new `LogicalPlan` based on `self` with inputs and
756    /// expressions replaced.
757    ///
758    /// Note this method creates an entirely new node, which requires a large
759    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
760    /// instead of this API.
761    ///
762    /// The exprs correspond to the same order of expressions returned
763    /// by [`Self::expressions`]. This function is used by optimizers
764    /// to rewrite plans using the following pattern:
765    ///
766    /// [`tree_node`]: crate::logical_plan::tree_node
767    ///
768    /// ```text
769    /// let new_inputs = optimize_children(..., plan, props);
770    ///
771    /// // get the plans expressions to optimize
772    /// let exprs = plan.expressions();
773    ///
774    /// // potentially rewrite plan expressions
775    /// let rewritten_exprs = rewrite_exprs(exprs);
776    ///
777    /// // create new plan using rewritten_exprs in same position
778    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
779    /// ```
780    pub fn with_new_exprs(
781        &self,
782        mut expr: Vec<Expr>,
783        inputs: Vec<LogicalPlan>,
784    ) -> Result<LogicalPlan> {
785        match self {
786            // Since expr may be different than the previous expr, schema of the projection
787            // may change. We need to use try_new method instead of try_new_with_schema method.
788            LogicalPlan::Projection(Projection { .. }) => {
789                let input = self.only_input(inputs)?;
790                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
791            }
792            LogicalPlan::Dml(DmlStatement {
793                table_name,
794                target,
795                op,
796                ..
797            }) => {
798                self.assert_no_expressions(expr)?;
799                let input = self.only_input(inputs)?;
800                Ok(LogicalPlan::Dml(DmlStatement::new(
801                    table_name.clone(),
802                    Arc::clone(target),
803                    op.clone(),
804                    Arc::new(input),
805                )))
806            }
807            LogicalPlan::Copy(CopyTo {
808                input: _,
809                output_url,
810                file_type,
811                options,
812                partition_by,
813                output_schema: _,
814            }) => {
815                self.assert_no_expressions(expr)?;
816                let input = self.only_input(inputs)?;
817                Ok(LogicalPlan::Copy(CopyTo::new(
818                    Arc::new(input),
819                    output_url.clone(),
820                    partition_by.clone(),
821                    Arc::clone(file_type),
822                    options.clone(),
823                )))
824            }
825            LogicalPlan::Values(Values { schema, .. }) => {
826                self.assert_no_inputs(inputs)?;
827                Ok(LogicalPlan::Values(Values {
828                    schema: Arc::clone(schema),
829                    values: expr
830                        .chunks_exact(schema.fields().len())
831                        .map(|s| s.to_vec())
832                        .collect(),
833                }))
834            }
835            LogicalPlan::Filter { .. } => {
836                let predicate = self.only_expr(expr)?;
837                let input = self.only_input(inputs)?;
838
839                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
840            }
841            LogicalPlan::Repartition(Repartition {
842                partitioning_scheme,
843                ..
844            }) => match partitioning_scheme {
845                Partitioning::RoundRobinBatch(n) => {
846                    self.assert_no_expressions(expr)?;
847                    let input = self.only_input(inputs)?;
848                    Ok(LogicalPlan::Repartition(Repartition {
849                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
850                        input: Arc::new(input),
851                    }))
852                }
853                Partitioning::Hash(_, n) => {
854                    let input = self.only_input(inputs)?;
855                    Ok(LogicalPlan::Repartition(Repartition {
856                        partitioning_scheme: Partitioning::Hash(expr, *n),
857                        input: Arc::new(input),
858                    }))
859                }
860                Partitioning::DistributeBy(_) => {
861                    let input = self.only_input(inputs)?;
862                    Ok(LogicalPlan::Repartition(Repartition {
863                        partitioning_scheme: Partitioning::DistributeBy(expr),
864                        input: Arc::new(input),
865                    }))
866                }
867            },
868            LogicalPlan::Window(Window { window_expr, .. }) => {
869                assert_eq!(window_expr.len(), expr.len());
870                let input = self.only_input(inputs)?;
871                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
872            }
873            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
874                let input = self.only_input(inputs)?;
875                // group exprs are the first expressions
876                let agg_expr = expr.split_off(group_expr.len());
877
878                Aggregate::try_new(Arc::new(input), expr, agg_expr)
879                    .map(LogicalPlan::Aggregate)
880            }
881            LogicalPlan::Sort(Sort {
882                expr: sort_expr,
883                fetch,
884                ..
885            }) => {
886                let input = self.only_input(inputs)?;
887                Ok(LogicalPlan::Sort(Sort {
888                    expr: expr
889                        .into_iter()
890                        .zip(sort_expr.iter())
891                        .map(|(expr, sort)| sort.with_expr(expr))
892                        .collect(),
893                    input: Arc::new(input),
894                    fetch: *fetch,
895                }))
896            }
897            LogicalPlan::Join(Join {
898                join_type,
899                join_constraint,
900                on,
901                null_equality,
902                ..
903            }) => {
904                let (left, right) = self.only_two_inputs(inputs)?;
905                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
906
907                let equi_expr_count = on.len() * 2;
908                assert!(expr.len() >= equi_expr_count);
909
910                // Assume that the last expr, if any,
911                // is the filter_expr (non equality predicate from ON clause)
912                let filter_expr = if expr.len() > equi_expr_count {
913                    expr.pop()
914                } else {
915                    None
916                };
917
918                // The first part of expr is equi-exprs,
919                // and the struct of each equi-expr is like `left-expr = right-expr`.
920                assert_eq!(expr.len(), equi_expr_count);
921                let mut new_on = Vec::with_capacity(on.len());
922                let mut iter = expr.into_iter();
923                while let Some(left) = iter.next() {
924                    let Some(right) = iter.next() else {
925                        internal_err!("Expected a pair of expressions to construct the join on expression")?
926                    };
927
928                    // SimplifyExpression rule may add alias to the equi_expr.
929                    new_on.push((left.unalias(), right.unalias()));
930                }
931
932                Ok(LogicalPlan::Join(Join {
933                    left: Arc::new(left),
934                    right: Arc::new(right),
935                    join_type: *join_type,
936                    join_constraint: *join_constraint,
937                    on: new_on,
938                    filter: filter_expr,
939                    schema: DFSchemaRef::new(schema),
940                    null_equality: *null_equality,
941                }))
942            }
943            LogicalPlan::Subquery(Subquery {
944                outer_ref_columns,
945                spans,
946                ..
947            }) => {
948                self.assert_no_expressions(expr)?;
949                let input = self.only_input(inputs)?;
950                let subquery = LogicalPlanBuilder::from(input).build()?;
951                Ok(LogicalPlan::Subquery(Subquery {
952                    subquery: Arc::new(subquery),
953                    outer_ref_columns: outer_ref_columns.clone(),
954                    spans: spans.clone(),
955                }))
956            }
957            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
958                self.assert_no_expressions(expr)?;
959                let input = self.only_input(inputs)?;
960                SubqueryAlias::try_new(Arc::new(input), alias.clone())
961                    .map(LogicalPlan::SubqueryAlias)
962            }
963            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
964                let old_expr_len = skip.iter().chain(fetch.iter()).count();
965                if old_expr_len != expr.len() {
966                    return internal_err!(
967                        "Invalid number of new Limit expressions: expected {}, got {}",
968                        old_expr_len,
969                        expr.len()
970                    );
971                }
972                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
973                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
974                let new_skip = skip.as_ref().and_then(|_| expr.pop());
975                let input = self.only_input(inputs)?;
976                Ok(LogicalPlan::Limit(Limit {
977                    skip: new_skip.map(Box::new),
978                    fetch: new_fetch.map(Box::new),
979                    input: Arc::new(input),
980                }))
981            }
982            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
983                name,
984                if_not_exists,
985                or_replace,
986                column_defaults,
987                temporary,
988                ..
989            })) => {
990                self.assert_no_expressions(expr)?;
991                let input = self.only_input(inputs)?;
992                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
993                    CreateMemoryTable {
994                        input: Arc::new(input),
995                        constraints: Constraints::default(),
996                        name: name.clone(),
997                        if_not_exists: *if_not_exists,
998                        or_replace: *or_replace,
999                        column_defaults: column_defaults.clone(),
1000                        temporary: *temporary,
1001                    },
1002                )))
1003            }
1004            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1005                name,
1006                or_replace,
1007                definition,
1008                temporary,
1009                ..
1010            })) => {
1011                self.assert_no_expressions(expr)?;
1012                let input = self.only_input(inputs)?;
1013                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1014                    input: Arc::new(input),
1015                    name: name.clone(),
1016                    or_replace: *or_replace,
1017                    temporary: *temporary,
1018                    definition: definition.clone(),
1019                })))
1020            }
1021            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1022                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1023            })),
1024            LogicalPlan::Union(Union { schema, .. }) => {
1025                self.assert_no_expressions(expr)?;
1026                let input_schema = inputs[0].schema();
1027                // If inputs are not pruned do not change schema.
1028                let schema = if schema.fields().len() == input_schema.fields().len() {
1029                    Arc::clone(schema)
1030                } else {
1031                    Arc::clone(input_schema)
1032                };
1033                Ok(LogicalPlan::Union(Union {
1034                    inputs: inputs.into_iter().map(Arc::new).collect(),
1035                    schema,
1036                }))
1037            }
1038            LogicalPlan::Distinct(distinct) => {
1039                let distinct = match distinct {
1040                    Distinct::All(_) => {
1041                        self.assert_no_expressions(expr)?;
1042                        let input = self.only_input(inputs)?;
1043                        Distinct::All(Arc::new(input))
1044                    }
1045                    Distinct::On(DistinctOn {
1046                        on_expr,
1047                        select_expr,
1048                        ..
1049                    }) => {
1050                        let input = self.only_input(inputs)?;
1051                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1052                        let select_expr = expr.split_off(on_expr.len());
1053                        assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1054                        Distinct::On(DistinctOn::try_new(
1055                            expr,
1056                            select_expr,
1057                            None, // no sort expressions accepted
1058                            Arc::new(input),
1059                        )?)
1060                    }
1061                };
1062                Ok(LogicalPlan::Distinct(distinct))
1063            }
1064            LogicalPlan::RecursiveQuery(RecursiveQuery {
1065                name, is_distinct, ..
1066            }) => {
1067                self.assert_no_expressions(expr)?;
1068                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1069                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1070                    name: name.clone(),
1071                    static_term: Arc::new(static_term),
1072                    recursive_term: Arc::new(recursive_term),
1073                    is_distinct: *is_distinct,
1074                }))
1075            }
1076            LogicalPlan::Analyze(a) => {
1077                self.assert_no_expressions(expr)?;
1078                let input = self.only_input(inputs)?;
1079                Ok(LogicalPlan::Analyze(Analyze {
1080                    verbose: a.verbose,
1081                    schema: Arc::clone(&a.schema),
1082                    input: Arc::new(input),
1083                }))
1084            }
1085            LogicalPlan::Explain(e) => {
1086                self.assert_no_expressions(expr)?;
1087                let input = self.only_input(inputs)?;
1088                Ok(LogicalPlan::Explain(Explain {
1089                    verbose: e.verbose,
1090                    plan: Arc::new(input),
1091                    explain_format: e.explain_format.clone(),
1092                    stringified_plans: e.stringified_plans.clone(),
1093                    schema: Arc::clone(&e.schema),
1094                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1095                }))
1096            }
1097            LogicalPlan::Statement(Statement::Prepare(Prepare {
1098                name,
1099                data_types,
1100                ..
1101            })) => {
1102                self.assert_no_expressions(expr)?;
1103                let input = self.only_input(inputs)?;
1104                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1105                    name: name.clone(),
1106                    data_types: data_types.clone(),
1107                    input: Arc::new(input),
1108                })))
1109            }
1110            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1111                self.assert_no_inputs(inputs)?;
1112                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1113                    name: name.clone(),
1114                    parameters: expr,
1115                })))
1116            }
1117            LogicalPlan::TableScan(ts) => {
1118                self.assert_no_inputs(inputs)?;
1119                Ok(LogicalPlan::TableScan(TableScan {
1120                    filters: expr,
1121                    ..ts.clone()
1122                }))
1123            }
1124            LogicalPlan::EmptyRelation(_)
1125            | LogicalPlan::Ddl(_)
1126            | LogicalPlan::Statement(_)
1127            | LogicalPlan::DescribeTable(_) => {
1128                // All of these plan types have no inputs / exprs so should not be called
1129                self.assert_no_expressions(expr)?;
1130                self.assert_no_inputs(inputs)?;
1131                Ok(self.clone())
1132            }
1133            LogicalPlan::Unnest(Unnest {
1134                exec_columns: columns,
1135                options,
1136                ..
1137            }) => {
1138                self.assert_no_expressions(expr)?;
1139                let input = self.only_input(inputs)?;
1140                // Update schema with unnested column type.
1141                let new_plan =
1142                    unnest_with_options(input, columns.clone(), options.clone())?;
1143                Ok(new_plan)
1144            }
1145        }
1146    }
1147
1148    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1149    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1150        match check {
1151            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1152            InvariantLevel::Executable => assert_executable_invariants(self),
1153        }
1154    }
1155
1156    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1157    #[inline]
1158    #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1159    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1160        if !expr.is_empty() {
1161            return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1162        }
1163        Ok(())
1164    }
1165
1166    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1167    #[inline]
1168    #[allow(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1169    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1170        if !inputs.is_empty() {
1171            return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1172        }
1173        Ok(())
1174    }
1175
1176    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1177    #[inline]
1178    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1179        if expr.len() != 1 {
1180            return internal_err!(
1181                "{self:?} should have exactly one expr, got {:?}",
1182                expr
1183            );
1184        }
1185        Ok(expr.remove(0))
1186    }
1187
1188    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1189    #[inline]
1190    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1191        if inputs.len() != 1 {
1192            return internal_err!(
1193                "{self:?} should have exactly one input, got {:?}",
1194                inputs
1195            );
1196        }
1197        Ok(inputs.remove(0))
1198    }
1199
1200    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1201    #[inline]
1202    fn only_two_inputs(
1203        &self,
1204        mut inputs: Vec<LogicalPlan>,
1205    ) -> Result<(LogicalPlan, LogicalPlan)> {
1206        if inputs.len() != 2 {
1207            return internal_err!(
1208                "{self:?} should have exactly two inputs, got {:?}",
1209                inputs
1210            );
1211        }
1212        let right = inputs.remove(1);
1213        let left = inputs.remove(0);
1214        Ok((left, right))
1215    }
1216
1217    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1218    /// with the specified `param_values`.
1219    ///
1220    /// [`Prepare`] statements are converted to
1221    /// their inner logical plan for execution.
1222    ///
1223    /// # Example
1224    /// ```
1225    /// # use arrow::datatypes::{Field, Schema, DataType};
1226    /// use datafusion_common::ScalarValue;
1227    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1228    /// # let schema = Schema::new(vec![
1229    /// #     Field::new("id", DataType::Int32, false),
1230    /// # ]);
1231    /// // Build SELECT * FROM t1 WHERE id = $1
1232    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1233    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1234    ///     .build().unwrap();
1235    ///
1236    /// assert_eq!(
1237    ///   "Filter: t1.id = $1\
1238    ///   \n  TableScan: t1",
1239    ///   plan.display_indent().to_string()
1240    /// );
1241    ///
1242    /// // Fill in the parameter $1 with a literal 3
1243    /// let plan = plan.with_param_values(vec![
1244    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1245    /// ]).unwrap();
1246    ///
1247    /// assert_eq!(
1248    ///    "Filter: t1.id = Int32(3)\
1249    ///    \n  TableScan: t1",
1250    ///    plan.display_indent().to_string()
1251    ///  );
1252    ///
1253    /// // Note you can also used named parameters
1254    /// // Build SELECT * FROM t1 WHERE id = $my_param
1255    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1256    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1257    ///     .build().unwrap()
1258    ///     // Fill in the parameter $my_param with a literal 3
1259    ///     .with_param_values(vec![
1260    ///       ("my_param", ScalarValue::from(3i32)),
1261    ///     ]).unwrap();
1262    ///
1263    /// assert_eq!(
1264    ///    "Filter: t1.id = Int32(3)\
1265    ///    \n  TableScan: t1",
1266    ///    plan.display_indent().to_string()
1267    ///  );
1268    ///
1269    /// ```
1270    pub fn with_param_values(
1271        self,
1272        param_values: impl Into<ParamValues>,
1273    ) -> Result<LogicalPlan> {
1274        let param_values = param_values.into();
1275        let plan_with_values = self.replace_params_with_values(&param_values)?;
1276
1277        // unwrap Prepare
1278        Ok(
1279            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1280                plan_with_values
1281            {
1282                param_values.verify(&prepare_lp.data_types)?;
1283                // try and take ownership of the input if is not shared, clone otherwise
1284                Arc::unwrap_or_clone(prepare_lp.input)
1285            } else {
1286                plan_with_values
1287            },
1288        )
1289    }
1290
1291    /// Returns the maximum number of rows that this plan can output, if known.
1292    ///
1293    /// If `None`, the plan can return any number of rows.
1294    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1295    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1296        match self {
1297            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1298            LogicalPlan::Filter(filter) => {
1299                if filter.is_scalar() {
1300                    Some(1)
1301                } else {
1302                    filter.input.max_rows()
1303                }
1304            }
1305            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1306            LogicalPlan::Aggregate(Aggregate {
1307                input, group_expr, ..
1308            }) => {
1309                // Empty group_expr will return Some(1)
1310                if group_expr
1311                    .iter()
1312                    .all(|expr| matches!(expr, Expr::Literal(_, _)))
1313                {
1314                    Some(1)
1315                } else {
1316                    input.max_rows()
1317                }
1318            }
1319            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1320                match (fetch, input.max_rows()) {
1321                    (Some(fetch_limit), Some(input_max)) => {
1322                        Some(input_max.min(*fetch_limit))
1323                    }
1324                    (Some(fetch_limit), None) => Some(*fetch_limit),
1325                    (None, Some(input_max)) => Some(input_max),
1326                    (None, None) => None,
1327                }
1328            }
1329            LogicalPlan::Join(Join {
1330                left,
1331                right,
1332                join_type,
1333                ..
1334            }) => match join_type {
1335                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1336                JoinType::Left | JoinType::Right | JoinType::Full => {
1337                    match (left.max_rows()?, right.max_rows()?, join_type) {
1338                        (0, 0, _) => Some(0),
1339                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1340                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1341                        (left_max, right_max, _) => Some(left_max * right_max),
1342                    }
1343                }
1344                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1345                    left.max_rows()
1346                }
1347                JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1348                    right.max_rows()
1349                }
1350            },
1351            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1352            LogicalPlan::Union(Union { inputs, .. }) => {
1353                inputs.iter().try_fold(0usize, |mut acc, plan| {
1354                    acc += plan.max_rows()?;
1355                    Some(acc)
1356                })
1357            }
1358            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1359            LogicalPlan::EmptyRelation(_) => Some(0),
1360            LogicalPlan::RecursiveQuery(_) => None,
1361            LogicalPlan::Subquery(_) => None,
1362            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1363            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1364                Ok(FetchType::Literal(s)) => s,
1365                _ => None,
1366            },
1367            LogicalPlan::Distinct(
1368                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1369            ) => input.max_rows(),
1370            LogicalPlan::Values(v) => Some(v.values.len()),
1371            LogicalPlan::Unnest(_) => None,
1372            LogicalPlan::Ddl(_)
1373            | LogicalPlan::Explain(_)
1374            | LogicalPlan::Analyze(_)
1375            | LogicalPlan::Dml(_)
1376            | LogicalPlan::Copy(_)
1377            | LogicalPlan::DescribeTable(_)
1378            | LogicalPlan::Statement(_)
1379            | LogicalPlan::Extension(_) => None,
1380        }
1381    }
1382
1383    /// If this node's expressions contains any references to an outer subquery
1384    pub fn contains_outer_reference(&self) -> bool {
1385        let mut contains = false;
1386        self.apply_expressions(|expr| {
1387            Ok(if expr.contains_outer() {
1388                contains = true;
1389                TreeNodeRecursion::Stop
1390            } else {
1391                TreeNodeRecursion::Continue
1392            })
1393        })
1394        .unwrap();
1395        contains
1396    }
1397
1398    /// Get the output expressions and their corresponding columns.
1399    ///
1400    /// The parent node may reference the output columns of the plan by expressions, such as
1401    /// projection over aggregate or window functions. This method helps to convert the
1402    /// referenced expressions into columns.
1403    ///
1404    /// See also: [`crate::utils::columnize_expr`]
1405    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1406        match self {
1407            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1408                .output_expressions()?
1409                .into_iter()
1410                .zip(self.schema().columns())
1411                .collect()),
1412            LogicalPlan::Window(Window {
1413                window_expr,
1414                input,
1415                schema,
1416            }) => {
1417                // The input could be another Window, so the result should also include the input's. For Example:
1418                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1419                // Its plan is:
1420                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1421                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1422                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1423                //       TableScan: t projection=[a, b]
1424                let mut output_exprs = input.columnized_output_exprs()?;
1425                let input_len = input.schema().fields().len();
1426                output_exprs.extend(
1427                    window_expr
1428                        .iter()
1429                        .zip(schema.columns().into_iter().skip(input_len)),
1430                );
1431                Ok(output_exprs)
1432            }
1433            _ => Ok(vec![]),
1434        }
1435    }
1436}
1437
1438impl LogicalPlan {
1439    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1440    /// ...) replaced with corresponding values provided in
1441    /// `params_values`
1442    ///
1443    /// See [`Self::with_param_values`] for examples and usage with an owned
1444    /// `ParamValues`
1445    pub fn replace_params_with_values(
1446        self,
1447        param_values: &ParamValues,
1448    ) -> Result<LogicalPlan> {
1449        self.transform_up_with_subqueries(|plan| {
1450            let schema = Arc::clone(plan.schema());
1451            let name_preserver = NamePreserver::new(&plan);
1452            plan.map_expressions(|e| {
1453                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1454                if !has_placeholder {
1455                    // Performance optimization:
1456                    // avoid NamePreserver copy and second pass over expression
1457                    // if no placeholders.
1458                    Ok(Transformed::no(e))
1459                } else {
1460                    let original_name = name_preserver.save(&e);
1461                    let transformed_expr = e.transform_up(|e| {
1462                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1463                            let value = param_values.get_placeholders_with_values(&id)?;
1464                            Ok(Transformed::yes(Expr::Literal(value, None)))
1465                        } else {
1466                            Ok(Transformed::no(e))
1467                        }
1468                    })?;
1469                    // Preserve name to avoid breaking column references to this expression
1470                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1471                }
1472            })
1473        })
1474        .map(|res| res.data)
1475    }
1476
1477    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1478    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1479        let mut param_names = HashSet::new();
1480        self.apply_with_subqueries(|plan| {
1481            plan.apply_expressions(|expr| {
1482                expr.apply(|expr| {
1483                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1484                        param_names.insert(id.clone());
1485                    }
1486                    Ok(TreeNodeRecursion::Continue)
1487                })
1488            })
1489        })
1490        .map(|_| param_names)
1491    }
1492
1493    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1494    pub fn get_parameter_types(
1495        &self,
1496    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1497        let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1498
1499        self.apply_with_subqueries(|plan| {
1500            plan.apply_expressions(|expr| {
1501                expr.apply(|expr| {
1502                    if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1503                        let prev = param_types.get(id);
1504                        match (prev, data_type) {
1505                            (Some(Some(prev)), Some(dt)) => {
1506                                if prev != dt {
1507                                    plan_err!("Conflicting types for {id}")?;
1508                                }
1509                            }
1510                            (_, Some(dt)) => {
1511                                param_types.insert(id.clone(), Some(dt.clone()));
1512                            }
1513                            _ => {
1514                                param_types.insert(id.clone(), None);
1515                            }
1516                        }
1517                    }
1518                    Ok(TreeNodeRecursion::Continue)
1519                })
1520            })
1521        })
1522        .map(|_| param_types)
1523    }
1524
1525    // ------------
1526    // Various implementations for printing out LogicalPlans
1527    // ------------
1528
1529    /// Return a `format`able structure that produces a single line
1530    /// per node.
1531    ///
1532    /// # Example
1533    ///
1534    /// ```text
1535    /// Projection: employee.id
1536    ///    Filter: employee.state Eq Utf8(\"CO\")\
1537    ///       CsvScan: employee projection=Some([0, 3])
1538    /// ```
1539    ///
1540    /// ```
1541    /// use arrow::datatypes::{Field, Schema, DataType};
1542    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1543    /// let schema = Schema::new(vec![
1544    ///     Field::new("id", DataType::Int32, false),
1545    /// ]);
1546    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1547    ///     .filter(col("id").eq(lit(5))).unwrap()
1548    ///     .build().unwrap();
1549    ///
1550    /// // Format using display_indent
1551    /// let display_string = format!("{}", plan.display_indent());
1552    ///
1553    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1",
1554    ///             display_string);
1555    /// ```
1556    pub fn display_indent(&self) -> impl Display + '_ {
1557        // Boilerplate structure to wrap LogicalPlan with something
1558        // that that can be formatted
1559        struct Wrapper<'a>(&'a LogicalPlan);
1560        impl Display for Wrapper<'_> {
1561            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1562                let with_schema = false;
1563                let mut visitor = IndentVisitor::new(f, with_schema);
1564                match self.0.visit_with_subqueries(&mut visitor) {
1565                    Ok(_) => Ok(()),
1566                    Err(_) => Err(fmt::Error),
1567                }
1568            }
1569        }
1570        Wrapper(self)
1571    }
1572
1573    /// Return a `format`able structure that produces a single line
1574    /// per node that includes the output schema. For example:
1575    ///
1576    /// ```text
1577    /// Projection: employee.id [id:Int32]\
1578    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1579    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1580    /// ```
1581    ///
1582    /// ```
1583    /// use arrow::datatypes::{Field, Schema, DataType};
1584    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1585    /// let schema = Schema::new(vec![
1586    ///     Field::new("id", DataType::Int32, false),
1587    /// ]);
1588    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1589    ///     .filter(col("id").eq(lit(5))).unwrap()
1590    ///     .build().unwrap();
1591    ///
1592    /// // Format using display_indent_schema
1593    /// let display_string = format!("{}", plan.display_indent_schema());
1594    ///
1595    /// assert_eq!("Filter: t1.id = Int32(5) [id:Int32]\
1596    ///             \n  TableScan: t1 [id:Int32]",
1597    ///             display_string);
1598    /// ```
1599    pub fn display_indent_schema(&self) -> impl Display + '_ {
1600        // Boilerplate structure to wrap LogicalPlan with something
1601        // that that can be formatted
1602        struct Wrapper<'a>(&'a LogicalPlan);
1603        impl Display for Wrapper<'_> {
1604            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1605                let with_schema = true;
1606                let mut visitor = IndentVisitor::new(f, with_schema);
1607                match self.0.visit_with_subqueries(&mut visitor) {
1608                    Ok(_) => Ok(()),
1609                    Err(_) => Err(fmt::Error),
1610                }
1611            }
1612        }
1613        Wrapper(self)
1614    }
1615
1616    /// Return a displayable structure that produces plan in postgresql JSON format.
1617    ///
1618    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1619    pub fn display_pg_json(&self) -> impl Display + '_ {
1620        // Boilerplate structure to wrap LogicalPlan with something
1621        // that that can be formatted
1622        struct Wrapper<'a>(&'a LogicalPlan);
1623        impl Display for Wrapper<'_> {
1624            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1625                let mut visitor = PgJsonVisitor::new(f);
1626                visitor.with_schema(true);
1627                match self.0.visit_with_subqueries(&mut visitor) {
1628                    Ok(_) => Ok(()),
1629                    Err(_) => Err(fmt::Error),
1630                }
1631            }
1632        }
1633        Wrapper(self)
1634    }
1635
1636    /// Return a `format`able structure that produces lines meant for
1637    /// graphical display using the `DOT` language. This format can be
1638    /// visualized using software from
1639    /// [`graphviz`](https://graphviz.org/)
1640    ///
1641    /// This currently produces two graphs -- one with the basic
1642    /// structure, and one with additional details such as schema.
1643    ///
1644    /// ```
1645    /// use arrow::datatypes::{Field, Schema, DataType};
1646    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1647    /// let schema = Schema::new(vec![
1648    ///     Field::new("id", DataType::Int32, false),
1649    /// ]);
1650    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1651    ///     .filter(col("id").eq(lit(5))).unwrap()
1652    ///     .build().unwrap();
1653    ///
1654    /// // Format using display_graphviz
1655    /// let graphviz_string = format!("{}", plan.display_graphviz());
1656    /// ```
1657    ///
1658    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1659    /// commands can be used to render it as a pdf:
1660    ///
1661    /// ```bash
1662    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1663    /// ```
1664    ///
1665    pub fn display_graphviz(&self) -> impl Display + '_ {
1666        // Boilerplate structure to wrap LogicalPlan with something
1667        // that that can be formatted
1668        struct Wrapper<'a>(&'a LogicalPlan);
1669        impl Display for Wrapper<'_> {
1670            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1671                let mut visitor = GraphvizVisitor::new(f);
1672
1673                visitor.start_graph()?;
1674
1675                visitor.pre_visit_plan("LogicalPlan")?;
1676                self.0
1677                    .visit_with_subqueries(&mut visitor)
1678                    .map_err(|_| fmt::Error)?;
1679                visitor.post_visit_plan()?;
1680
1681                visitor.set_with_schema(true);
1682                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1683                self.0
1684                    .visit_with_subqueries(&mut visitor)
1685                    .map_err(|_| fmt::Error)?;
1686                visitor.post_visit_plan()?;
1687
1688                visitor.end_graph()?;
1689                Ok(())
1690            }
1691        }
1692        Wrapper(self)
1693    }
1694
1695    /// Return a `format`able structure with the a human readable
1696    /// description of this LogicalPlan node per node, not including
1697    /// children. For example:
1698    ///
1699    /// ```text
1700    /// Projection: id
1701    /// ```
1702    /// ```
1703    /// use arrow::datatypes::{Field, Schema, DataType};
1704    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1705    /// let schema = Schema::new(vec![
1706    ///     Field::new("id", DataType::Int32, false),
1707    /// ]);
1708    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1709    ///     .build().unwrap();
1710    ///
1711    /// // Format using display
1712    /// let display_string = format!("{}", plan.display());
1713    ///
1714    /// assert_eq!("TableScan: t1", display_string);
1715    /// ```
1716    pub fn display(&self) -> impl Display + '_ {
1717        // Boilerplate structure to wrap LogicalPlan with something
1718        // that that can be formatted
1719        struct Wrapper<'a>(&'a LogicalPlan);
1720        impl Display for Wrapper<'_> {
1721            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1722                match self.0 {
1723                    LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1724                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1725                        is_distinct, ..
1726                    }) => {
1727                        write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1728                    }
1729                    LogicalPlan::Values(Values { ref values, .. }) => {
1730                        let str_values: Vec<_> = values
1731                            .iter()
1732                            // limit to only 5 values to avoid horrible display
1733                            .take(5)
1734                            .map(|row| {
1735                                let item = row
1736                                    .iter()
1737                                    .map(|expr| expr.to_string())
1738                                    .collect::<Vec<_>>()
1739                                    .join(", ");
1740                                format!("({item})")
1741                            })
1742                            .collect();
1743
1744                        let eclipse = if values.len() > 5 { "..." } else { "" };
1745                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1746                    }
1747
1748                    LogicalPlan::TableScan(TableScan {
1749                        ref source,
1750                        ref table_name,
1751                        ref projection,
1752                        ref filters,
1753                        ref fetch,
1754                        ..
1755                    }) => {
1756                        let projected_fields = match projection {
1757                            Some(indices) => {
1758                                let schema = source.schema();
1759                                let names: Vec<&str> = indices
1760                                    .iter()
1761                                    .map(|i| schema.field(*i).name().as_str())
1762                                    .collect();
1763                                format!(" projection=[{}]", names.join(", "))
1764                            }
1765                            _ => "".to_string(),
1766                        };
1767
1768                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1769
1770                        if !filters.is_empty() {
1771                            let mut full_filter = vec![];
1772                            let mut partial_filter = vec![];
1773                            let mut unsupported_filters = vec![];
1774                            let filters: Vec<&Expr> = filters.iter().collect();
1775
1776                            if let Ok(results) =
1777                                source.supports_filters_pushdown(&filters)
1778                            {
1779                                filters.iter().zip(results.iter()).for_each(
1780                                    |(x, res)| match res {
1781                                        TableProviderFilterPushDown::Exact => {
1782                                            full_filter.push(x)
1783                                        }
1784                                        TableProviderFilterPushDown::Inexact => {
1785                                            partial_filter.push(x)
1786                                        }
1787                                        TableProviderFilterPushDown::Unsupported => {
1788                                            unsupported_filters.push(x)
1789                                        }
1790                                    },
1791                                );
1792                            }
1793
1794                            if !full_filter.is_empty() {
1795                                write!(
1796                                    f,
1797                                    ", full_filters=[{}]",
1798                                    expr_vec_fmt!(full_filter)
1799                                )?;
1800                            };
1801                            if !partial_filter.is_empty() {
1802                                write!(
1803                                    f,
1804                                    ", partial_filters=[{}]",
1805                                    expr_vec_fmt!(partial_filter)
1806                                )?;
1807                            }
1808                            if !unsupported_filters.is_empty() {
1809                                write!(
1810                                    f,
1811                                    ", unsupported_filters=[{}]",
1812                                    expr_vec_fmt!(unsupported_filters)
1813                                )?;
1814                            }
1815                        }
1816
1817                        if let Some(n) = fetch {
1818                            write!(f, ", fetch={n}")?;
1819                        }
1820
1821                        Ok(())
1822                    }
1823                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
1824                        write!(f, "Projection:")?;
1825                        for (i, expr_item) in expr.iter().enumerate() {
1826                            if i > 0 {
1827                                write!(f, ",")?;
1828                            }
1829                            write!(f, " {expr_item}")?;
1830                        }
1831                        Ok(())
1832                    }
1833                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1834                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1835                    }
1836                    LogicalPlan::Copy(CopyTo {
1837                        input: _,
1838                        output_url,
1839                        file_type,
1840                        options,
1841                        ..
1842                    }) => {
1843                        let op_str = options
1844                            .iter()
1845                            .map(|(k, v)| format!("{k} {v}"))
1846                            .collect::<Vec<String>>()
1847                            .join(", ");
1848
1849                        write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1850                    }
1851                    LogicalPlan::Ddl(ddl) => {
1852                        write!(f, "{}", ddl.display())
1853                    }
1854                    LogicalPlan::Filter(Filter {
1855                        predicate: ref expr,
1856                        ..
1857                    }) => write!(f, "Filter: {expr}"),
1858                    LogicalPlan::Window(Window {
1859                        ref window_expr, ..
1860                    }) => {
1861                        write!(
1862                            f,
1863                            "WindowAggr: windowExpr=[[{}]]",
1864                            expr_vec_fmt!(window_expr)
1865                        )
1866                    }
1867                    LogicalPlan::Aggregate(Aggregate {
1868                        ref group_expr,
1869                        ref aggr_expr,
1870                        ..
1871                    }) => write!(
1872                        f,
1873                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1874                        expr_vec_fmt!(group_expr),
1875                        expr_vec_fmt!(aggr_expr)
1876                    ),
1877                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1878                        write!(f, "Sort: ")?;
1879                        for (i, expr_item) in expr.iter().enumerate() {
1880                            if i > 0 {
1881                                write!(f, ", ")?;
1882                            }
1883                            write!(f, "{expr_item}")?;
1884                        }
1885                        if let Some(a) = fetch {
1886                            write!(f, ", fetch={a}")?;
1887                        }
1888
1889                        Ok(())
1890                    }
1891                    LogicalPlan::Join(Join {
1892                        on: ref keys,
1893                        filter,
1894                        join_constraint,
1895                        join_type,
1896                        ..
1897                    }) => {
1898                        let join_expr: Vec<String> =
1899                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1900                        let filter_expr = filter
1901                            .as_ref()
1902                            .map(|expr| format!(" Filter: {expr}"))
1903                            .unwrap_or_else(|| "".to_string());
1904                        let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1905                            "Cross".to_string()
1906                        } else {
1907                            join_type.to_string()
1908                        };
1909                        match join_constraint {
1910                            JoinConstraint::On => {
1911                                write!(
1912                                    f,
1913                                    "{} Join: {}{}",
1914                                    join_type,
1915                                    join_expr.join(", "),
1916                                    filter_expr
1917                                )
1918                            }
1919                            JoinConstraint::Using => {
1920                                write!(
1921                                    f,
1922                                    "{} Join: Using {}{}",
1923                                    join_type,
1924                                    join_expr.join(", "),
1925                                    filter_expr,
1926                                )
1927                            }
1928                        }
1929                    }
1930                    LogicalPlan::Repartition(Repartition {
1931                        partitioning_scheme,
1932                        ..
1933                    }) => match partitioning_scheme {
1934                        Partitioning::RoundRobinBatch(n) => {
1935                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1936                        }
1937                        Partitioning::Hash(expr, n) => {
1938                            let hash_expr: Vec<String> =
1939                                expr.iter().map(|e| format!("{e}")).collect();
1940                            write!(
1941                                f,
1942                                "Repartition: Hash({}) partition_count={}",
1943                                hash_expr.join(", "),
1944                                n
1945                            )
1946                        }
1947                        Partitioning::DistributeBy(expr) => {
1948                            let dist_by_expr: Vec<String> =
1949                                expr.iter().map(|e| format!("{e}")).collect();
1950                            write!(
1951                                f,
1952                                "Repartition: DistributeBy({})",
1953                                dist_by_expr.join(", "),
1954                            )
1955                        }
1956                    },
1957                    LogicalPlan::Limit(limit) => {
1958                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
1959                        let skip_str = match limit.get_skip_type() {
1960                            Ok(SkipType::Literal(n)) => n.to_string(),
1961                            _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1962                        };
1963                        let fetch_str = match limit.get_fetch_type() {
1964                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
1965                            Ok(FetchType::Literal(None)) => "None".to_string(),
1966                            _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1967                        };
1968                        write!(
1969                            f,
1970                            "Limit: skip={skip_str}, fetch={fetch_str}",
1971                        )
1972                    }
1973                    LogicalPlan::Subquery(Subquery { .. }) => {
1974                        write!(f, "Subquery:")
1975                    }
1976                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1977                        write!(f, "SubqueryAlias: {alias}")
1978                    }
1979                    LogicalPlan::Statement(statement) => {
1980                        write!(f, "{}", statement.display())
1981                    }
1982                    LogicalPlan::Distinct(distinct) => match distinct {
1983                        Distinct::All(_) => write!(f, "Distinct:"),
1984                        Distinct::On(DistinctOn {
1985                            on_expr,
1986                            select_expr,
1987                            sort_expr,
1988                            ..
1989                        }) => write!(
1990                            f,
1991                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1992                            expr_vec_fmt!(on_expr),
1993                            expr_vec_fmt!(select_expr),
1994                            if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1995                        ),
1996                    },
1997                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
1998                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1999                    LogicalPlan::Union(_) => write!(f, "Union"),
2000                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2001                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2002                        write!(f, "DescribeTable")
2003                    }
2004                    LogicalPlan::Unnest(Unnest {
2005                        input: plan,
2006                        list_type_columns: list_col_indices,
2007                        struct_type_columns: struct_col_indices, .. }) => {
2008                        let input_columns = plan.schema().columns();
2009                        let list_type_columns = list_col_indices
2010                            .iter()
2011                            .map(|(i,unnest_info)|
2012                                format!("{}|depth={}", &input_columns[*i].to_string(),
2013                                unnest_info.depth))
2014                            .collect::<Vec<String>>();
2015                        let struct_type_columns = struct_col_indices
2016                            .iter()
2017                            .map(|i| &input_columns[*i])
2018                            .collect::<Vec<&Column>>();
2019                        // get items from input_columns indexed by list_col_indices
2020                        write!(f, "Unnest: lists[{}] structs[{}]",
2021                        expr_vec_fmt!(list_type_columns),
2022                        expr_vec_fmt!(struct_type_columns))
2023                    }
2024                }
2025            }
2026        }
2027        Wrapper(self)
2028    }
2029}
2030
2031impl Display for LogicalPlan {
2032    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2033        self.display_indent().fmt(f)
2034    }
2035}
2036
2037impl ToStringifiedPlan for LogicalPlan {
2038    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2039        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2040    }
2041}
2042
2043/// Produces no rows: An empty relation with an empty schema
2044#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2045pub struct EmptyRelation {
2046    /// Whether to produce a placeholder row
2047    pub produce_one_row: bool,
2048    /// The schema description of the output
2049    pub schema: DFSchemaRef,
2050}
2051
2052// Manual implementation needed because of `schema` field. Comparison excludes this field.
2053impl PartialOrd for EmptyRelation {
2054    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2055        self.produce_one_row.partial_cmp(&other.produce_one_row)
2056    }
2057}
2058
2059/// A variadic query operation, Recursive CTE.
2060///
2061/// # Recursive Query Evaluation
2062///
2063/// From the [Postgres Docs]:
2064///
2065/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2066///    discard duplicate rows. Include all remaining rows in the result of the
2067///    recursive query, and also place them in a temporary working table.
2068///
2069/// 2. So long as the working table is not empty, repeat these steps:
2070///
2071/// * Evaluate the recursive term, substituting the current contents of the
2072///   working table for the recursive self-reference. For `UNION` (but not `UNION
2073///   ALL`), discard duplicate rows and rows that duplicate any previous result
2074///   row. Include all remaining rows in the result of the recursive query, and
2075///   also place them in a temporary intermediate table.
2076///
2077/// * Replace the contents of the working table with the contents of the
2078///   intermediate table, then empty the intermediate table.
2079///
2080/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2081#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2082pub struct RecursiveQuery {
2083    /// Name of the query
2084    pub name: String,
2085    /// The static term (initial contents of the working table)
2086    pub static_term: Arc<LogicalPlan>,
2087    /// The recursive term (evaluated on the contents of the working table until
2088    /// it returns an empty set)
2089    pub recursive_term: Arc<LogicalPlan>,
2090    /// Should the output of the recursive term be deduplicated (`UNION`) or
2091    /// not (`UNION ALL`).
2092    pub is_distinct: bool,
2093}
2094
2095/// Values expression. See
2096/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2097/// documentation for more details.
2098#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2099pub struct Values {
2100    /// The table schema
2101    pub schema: DFSchemaRef,
2102    /// Values
2103    pub values: Vec<Vec<Expr>>,
2104}
2105
2106// Manual implementation needed because of `schema` field. Comparison excludes this field.
2107impl PartialOrd for Values {
2108    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2109        self.values.partial_cmp(&other.values)
2110    }
2111}
2112
2113/// Evaluates an arbitrary list of expressions (essentially a
2114/// SELECT with an expression list) on its input.
2115#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2116// mark non_exhaustive to encourage use of try_new/new()
2117#[non_exhaustive]
2118pub struct Projection {
2119    /// The list of expressions
2120    pub expr: Vec<Expr>,
2121    /// The incoming logical plan
2122    pub input: Arc<LogicalPlan>,
2123    /// The schema description of the output
2124    pub schema: DFSchemaRef,
2125}
2126
2127// Manual implementation needed because of `schema` field. Comparison excludes this field.
2128impl PartialOrd for Projection {
2129    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2130        match self.expr.partial_cmp(&other.expr) {
2131            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2132            cmp => cmp,
2133        }
2134    }
2135}
2136
2137impl Projection {
2138    /// Create a new Projection
2139    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2140        let projection_schema = projection_schema(&input, &expr)?;
2141        Self::try_new_with_schema(expr, input, projection_schema)
2142    }
2143
2144    /// Create a new Projection using the specified output schema
2145    pub fn try_new_with_schema(
2146        expr: Vec<Expr>,
2147        input: Arc<LogicalPlan>,
2148        schema: DFSchemaRef,
2149    ) -> Result<Self> {
2150        #[expect(deprecated)]
2151        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2152            && expr.len() != schema.fields().len()
2153        {
2154            return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2155        }
2156        Ok(Self {
2157            expr,
2158            input,
2159            schema,
2160        })
2161    }
2162
2163    /// Create a new Projection using the specified output schema
2164    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2165        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2166        Self {
2167            expr,
2168            input,
2169            schema,
2170        }
2171    }
2172}
2173
2174/// Computes the schema of the result produced by applying a projection to the input logical plan.
2175///
2176/// # Arguments
2177///
2178/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2179///   will be computed.
2180/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2181///
2182/// # Returns
2183///
2184/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2185/// produced by the projection operation. If the schema computation is successful,
2186/// the `Result` will contain the schema; otherwise, it will contain an error.
2187pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2188    let metadata = input.schema().metadata().clone();
2189
2190    let schema =
2191        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2192            .with_functional_dependencies(calc_func_dependencies_for_project(
2193                exprs, input,
2194            )?)?;
2195
2196    Ok(Arc::new(schema))
2197}
2198
2199/// Aliased subquery
2200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2201// mark non_exhaustive to encourage use of try_new/new()
2202#[non_exhaustive]
2203pub struct SubqueryAlias {
2204    /// The incoming logical plan
2205    pub input: Arc<LogicalPlan>,
2206    /// The alias for the input relation
2207    pub alias: TableReference,
2208    /// The schema with qualified field names
2209    pub schema: DFSchemaRef,
2210}
2211
2212impl SubqueryAlias {
2213    pub fn try_new(
2214        plan: Arc<LogicalPlan>,
2215        alias: impl Into<TableReference>,
2216    ) -> Result<Self> {
2217        let alias = alias.into();
2218        let fields = change_redundant_column(plan.schema().fields());
2219        let meta_data = plan.schema().as_ref().metadata().clone();
2220        let schema: Schema =
2221            DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2222        // Since schema is the same, other than qualifier, we can use existing
2223        // functional dependencies:
2224        let func_dependencies = plan.schema().functional_dependencies().clone();
2225        let schema = DFSchemaRef::new(
2226            DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2227                .with_functional_dependencies(func_dependencies)?,
2228        );
2229        Ok(SubqueryAlias {
2230            input: plan,
2231            alias,
2232            schema,
2233        })
2234    }
2235}
2236
2237// Manual implementation needed because of `schema` field. Comparison excludes this field.
2238impl PartialOrd for SubqueryAlias {
2239    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2240        match self.input.partial_cmp(&other.input) {
2241            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2242            cmp => cmp,
2243        }
2244    }
2245}
2246
2247/// Filters rows from its input that do not match an
2248/// expression (essentially a WHERE clause with a predicate
2249/// expression).
2250///
2251/// Semantically, `<predicate>` is evaluated for each row of the input;
2252/// If the value of `<predicate>` is true, the input row is passed to
2253/// the output. If the value of `<predicate>` is false, the row is
2254/// discarded.
2255///
2256/// Filter should not be created directly but instead use `try_new()`
2257/// and that these fields are only pub to support pattern matching
2258#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2259#[non_exhaustive]
2260pub struct Filter {
2261    /// The predicate expression, which must have Boolean type.
2262    pub predicate: Expr,
2263    /// The incoming logical plan
2264    pub input: Arc<LogicalPlan>,
2265}
2266
2267impl Filter {
2268    /// Create a new filter operator.
2269    ///
2270    /// Notes: as Aliases have no effect on the output of a filter operator,
2271    /// they are removed from the predicate expression.
2272    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2273        Self::try_new_internal(predicate, input)
2274    }
2275
2276    /// Create a new filter operator for a having clause.
2277    /// This is similar to a filter, but its having flag is set to true.
2278    #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2279    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2280        Self::try_new_internal(predicate, input)
2281    }
2282
2283    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2284        match data_type {
2285            // Interpret NULL as a missing boolean value.
2286            DataType::Boolean | DataType::Null => true,
2287            DataType::Dictionary(_, value_type) => {
2288                Filter::is_allowed_filter_type(value_type.as_ref())
2289            }
2290            _ => false,
2291        }
2292    }
2293
2294    fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2295        // Filter predicates must return a boolean value so we try and validate that here.
2296        // Note that it is not always possible to resolve the predicate expression during plan
2297        // construction (such as with correlated subqueries) so we make a best effort here and
2298        // ignore errors resolving the expression against the schema.
2299        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2300            if !Filter::is_allowed_filter_type(&predicate_type) {
2301                return plan_err!(
2302                    "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2303                );
2304            }
2305        }
2306
2307        Ok(Self {
2308            predicate: predicate.unalias_nested().data,
2309            input,
2310        })
2311    }
2312
2313    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2314    ///
2315    /// This function will return `true` if its predicate contains a conjunction of
2316    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2317    /// by this conjunction.
2318    ///
2319    /// For example, for the table:
2320    /// ```sql
2321    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2322    /// ```
2323    /// `Filter(a = 2).is_scalar() == true`
2324    /// , whereas
2325    /// `Filter(b = 2).is_scalar() == false`
2326    /// and
2327    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2328    fn is_scalar(&self) -> bool {
2329        let schema = self.input.schema();
2330
2331        let functional_dependencies = self.input.schema().functional_dependencies();
2332        let unique_keys = functional_dependencies.iter().filter(|dep| {
2333            let nullable = dep.nullable
2334                && dep
2335                    .source_indices
2336                    .iter()
2337                    .any(|&source| schema.field(source).is_nullable());
2338            !nullable
2339                && dep.mode == Dependency::Single
2340                && dep.target_indices.len() == schema.fields().len()
2341        });
2342
2343        let exprs = split_conjunction(&self.predicate);
2344        let eq_pred_cols: HashSet<_> = exprs
2345            .iter()
2346            .filter_map(|expr| {
2347                let Expr::BinaryExpr(BinaryExpr {
2348                    left,
2349                    op: Operator::Eq,
2350                    right,
2351                }) = expr
2352                else {
2353                    return None;
2354                };
2355                // This is a no-op filter expression
2356                if left == right {
2357                    return None;
2358                }
2359
2360                match (left.as_ref(), right.as_ref()) {
2361                    (Expr::Column(_), Expr::Column(_)) => None,
2362                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2363                        Some(schema.index_of_column(c).unwrap())
2364                    }
2365                    _ => None,
2366                }
2367            })
2368            .collect();
2369
2370        // If we have a functional dependence that is a subset of our predicate,
2371        // this filter is scalar
2372        for key in unique_keys {
2373            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2374                return true;
2375            }
2376        }
2377        false
2378    }
2379}
2380
2381/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2382///
2383/// # Output Schema
2384///
2385/// The output schema is the input schema followed by the window function
2386/// expressions, in order.
2387///
2388/// For example, given the input schema `"A", "B", "C"` and the window function
2389/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2390/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2391/// output column.
2392///
2393/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2394/// schema.
2395#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2396pub struct Window {
2397    /// The incoming logical plan
2398    pub input: Arc<LogicalPlan>,
2399    /// The window function expression
2400    pub window_expr: Vec<Expr>,
2401    /// The schema description of the window output
2402    pub schema: DFSchemaRef,
2403}
2404
2405impl Window {
2406    /// Create a new window operator.
2407    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2408        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2409            .schema()
2410            .iter()
2411            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2412            .collect();
2413        let input_len = fields.len();
2414        let mut window_fields = fields;
2415        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2416        window_fields.extend_from_slice(expr_fields.as_slice());
2417        let metadata = input.schema().metadata().clone();
2418
2419        // Update functional dependencies for window:
2420        let mut window_func_dependencies =
2421            input.schema().functional_dependencies().clone();
2422        window_func_dependencies.extend_target_indices(window_fields.len());
2423
2424        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2425        // of consecutive numbers per partition), we can represent this fact with
2426        // functional dependencies.
2427        let mut new_dependencies = window_expr
2428            .iter()
2429            .enumerate()
2430            .filter_map(|(idx, expr)| {
2431                let Expr::WindowFunction(window_fun) = expr else {
2432                    return None;
2433                };
2434                let WindowFunction {
2435                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2436                    params: WindowFunctionParams { partition_by, .. },
2437                } = window_fun.as_ref()
2438                else {
2439                    return None;
2440                };
2441                // When there is no PARTITION BY, row number will be unique
2442                // across the entire table.
2443                if udwf.name() == "row_number" && partition_by.is_empty() {
2444                    Some(idx + input_len)
2445                } else {
2446                    None
2447                }
2448            })
2449            .map(|idx| {
2450                FunctionalDependence::new(vec![idx], vec![], false)
2451                    .with_mode(Dependency::Single)
2452            })
2453            .collect::<Vec<_>>();
2454
2455        if !new_dependencies.is_empty() {
2456            for dependence in new_dependencies.iter_mut() {
2457                dependence.target_indices = (0..window_fields.len()).collect();
2458            }
2459            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2460            let new_deps = FunctionalDependencies::new(new_dependencies);
2461            window_func_dependencies.extend(new_deps);
2462        }
2463
2464        Self::try_new_with_schema(
2465            window_expr,
2466            input,
2467            Arc::new(
2468                DFSchema::new_with_metadata(window_fields, metadata)?
2469                    .with_functional_dependencies(window_func_dependencies)?,
2470            ),
2471        )
2472    }
2473
2474    pub fn try_new_with_schema(
2475        window_expr: Vec<Expr>,
2476        input: Arc<LogicalPlan>,
2477        schema: DFSchemaRef,
2478    ) -> Result<Self> {
2479        if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2480            return plan_err!(
2481                "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2482                window_expr.len(),
2483                schema.fields().len() - input.schema().fields().len()
2484            );
2485        }
2486
2487        Ok(Window {
2488            input,
2489            window_expr,
2490            schema,
2491        })
2492    }
2493}
2494
2495// Manual implementation needed because of `schema` field. Comparison excludes this field.
2496impl PartialOrd for Window {
2497    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2498        match self.input.partial_cmp(&other.input) {
2499            Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2500            cmp => cmp,
2501        }
2502    }
2503}
2504
2505/// Produces rows from a table provider by reference or from the context
2506#[derive(Clone)]
2507pub struct TableScan {
2508    /// The name of the table
2509    pub table_name: TableReference,
2510    /// The source of the table
2511    pub source: Arc<dyn TableSource>,
2512    /// Optional column indices to use as a projection
2513    pub projection: Option<Vec<usize>>,
2514    /// The schema description of the output
2515    pub projected_schema: DFSchemaRef,
2516    /// Optional expressions to be used as filters by the table provider
2517    pub filters: Vec<Expr>,
2518    /// Optional number of rows to read
2519    pub fetch: Option<usize>,
2520}
2521
2522impl Debug for TableScan {
2523    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2524        f.debug_struct("TableScan")
2525            .field("table_name", &self.table_name)
2526            .field("source", &"...")
2527            .field("projection", &self.projection)
2528            .field("projected_schema", &self.projected_schema)
2529            .field("filters", &self.filters)
2530            .field("fetch", &self.fetch)
2531            .finish_non_exhaustive()
2532    }
2533}
2534
2535impl PartialEq for TableScan {
2536    fn eq(&self, other: &Self) -> bool {
2537        self.table_name == other.table_name
2538            && self.projection == other.projection
2539            && self.projected_schema == other.projected_schema
2540            && self.filters == other.filters
2541            && self.fetch == other.fetch
2542    }
2543}
2544
2545impl Eq for TableScan {}
2546
2547// Manual implementation needed because of `source` and `projected_schema` fields.
2548// Comparison excludes these field.
2549impl PartialOrd for TableScan {
2550    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2551        #[derive(PartialEq, PartialOrd)]
2552        struct ComparableTableScan<'a> {
2553            /// The name of the table
2554            pub table_name: &'a TableReference,
2555            /// Optional column indices to use as a projection
2556            pub projection: &'a Option<Vec<usize>>,
2557            /// Optional expressions to be used as filters by the table provider
2558            pub filters: &'a Vec<Expr>,
2559            /// Optional number of rows to read
2560            pub fetch: &'a Option<usize>,
2561        }
2562        let comparable_self = ComparableTableScan {
2563            table_name: &self.table_name,
2564            projection: &self.projection,
2565            filters: &self.filters,
2566            fetch: &self.fetch,
2567        };
2568        let comparable_other = ComparableTableScan {
2569            table_name: &other.table_name,
2570            projection: &other.projection,
2571            filters: &other.filters,
2572            fetch: &other.fetch,
2573        };
2574        comparable_self.partial_cmp(&comparable_other)
2575    }
2576}
2577
2578impl Hash for TableScan {
2579    fn hash<H: Hasher>(&self, state: &mut H) {
2580        self.table_name.hash(state);
2581        self.projection.hash(state);
2582        self.projected_schema.hash(state);
2583        self.filters.hash(state);
2584        self.fetch.hash(state);
2585    }
2586}
2587
2588impl TableScan {
2589    /// Initialize TableScan with appropriate schema from the given
2590    /// arguments.
2591    pub fn try_new(
2592        table_name: impl Into<TableReference>,
2593        table_source: Arc<dyn TableSource>,
2594        projection: Option<Vec<usize>>,
2595        filters: Vec<Expr>,
2596        fetch: Option<usize>,
2597    ) -> Result<Self> {
2598        let table_name = table_name.into();
2599
2600        if table_name.table().is_empty() {
2601            return plan_err!("table_name cannot be empty");
2602        }
2603        let schema = table_source.schema();
2604        let func_dependencies = FunctionalDependencies::new_from_constraints(
2605            table_source.constraints(),
2606            schema.fields.len(),
2607        );
2608        let projected_schema = projection
2609            .as_ref()
2610            .map(|p| {
2611                let projected_func_dependencies =
2612                    func_dependencies.project_functional_dependencies(p, p.len());
2613
2614                let df_schema = DFSchema::new_with_metadata(
2615                    p.iter()
2616                        .map(|i| {
2617                            (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2618                        })
2619                        .collect(),
2620                    schema.metadata.clone(),
2621                )?;
2622                df_schema.with_functional_dependencies(projected_func_dependencies)
2623            })
2624            .unwrap_or_else(|| {
2625                let df_schema =
2626                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2627                df_schema.with_functional_dependencies(func_dependencies)
2628            })?;
2629        let projected_schema = Arc::new(projected_schema);
2630
2631        Ok(Self {
2632            table_name,
2633            source: table_source,
2634            projection,
2635            projected_schema,
2636            filters,
2637            fetch,
2638        })
2639    }
2640}
2641
2642// Repartition the plan based on a partitioning scheme.
2643#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2644pub struct Repartition {
2645    /// The incoming logical plan
2646    pub input: Arc<LogicalPlan>,
2647    /// The partitioning scheme
2648    pub partitioning_scheme: Partitioning,
2649}
2650
2651/// Union multiple inputs
2652#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2653pub struct Union {
2654    /// Inputs to merge
2655    pub inputs: Vec<Arc<LogicalPlan>>,
2656    /// Union schema. Should be the same for all inputs.
2657    pub schema: DFSchemaRef,
2658}
2659
2660impl Union {
2661    /// Constructs new Union instance deriving schema from inputs.
2662    fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2663        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2664        Ok(Union { inputs, schema })
2665    }
2666
2667    /// Constructs new Union instance deriving schema from inputs.
2668    /// Inputs do not have to have matching types and produced schema will
2669    /// take type from the first input.
2670    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2671    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2672        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2673        Ok(Union { inputs, schema })
2674    }
2675
2676    /// Constructs a new Union instance that combines rows from different tables by name,
2677    /// instead of by position. This means that the specified inputs need not have schemas
2678    /// that are all the same width.
2679    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2680        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2681        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2682
2683        Ok(Union { inputs, schema })
2684    }
2685
2686    /// When constructing a `UNION BY NAME`, we need to wrap inputs
2687    /// in an additional `Projection` to account for absence of columns
2688    /// in input schemas or differing projection orders.
2689    fn rewrite_inputs_from_schema(
2690        schema: &Arc<DFSchema>,
2691        inputs: Vec<Arc<LogicalPlan>>,
2692    ) -> Result<Vec<Arc<LogicalPlan>>> {
2693        let schema_width = schema.iter().count();
2694        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2695        for input in inputs {
2696            // Any columns that exist within the derived schema but do not exist
2697            // within an input's schema should be replaced with `NULL` aliased
2698            // to the appropriate column in the derived schema.
2699            let mut expr = Vec::with_capacity(schema_width);
2700            for column in schema.columns() {
2701                if input
2702                    .schema()
2703                    .has_column_with_unqualified_name(column.name())
2704                {
2705                    expr.push(Expr::Column(column));
2706                } else {
2707                    expr.push(
2708                        Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2709                    );
2710                }
2711            }
2712            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2713                Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2714            )));
2715        }
2716
2717        Ok(wrapped_inputs)
2718    }
2719
2720    /// Constructs new Union instance deriving schema from inputs.
2721    ///
2722    /// If `loose_types` is true, inputs do not need to have matching types and
2723    /// the produced schema will use the type from the first input.
2724    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2725    ///
2726    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2727    /// the constructed schema follows `UNION BY NAME` semantics.
2728    fn derive_schema_from_inputs(
2729        inputs: &[Arc<LogicalPlan>],
2730        loose_types: bool,
2731        by_name: bool,
2732    ) -> Result<DFSchemaRef> {
2733        if inputs.len() < 2 {
2734            return plan_err!("UNION requires at least two inputs");
2735        }
2736
2737        if by_name {
2738            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2739        } else {
2740            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2741        }
2742    }
2743
2744    fn derive_schema_from_inputs_by_name(
2745        inputs: &[Arc<LogicalPlan>],
2746        loose_types: bool,
2747    ) -> Result<DFSchemaRef> {
2748        type FieldData<'a> =
2749            (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2750        let mut cols: Vec<(&str, FieldData)> = Vec::new();
2751        for input in inputs.iter() {
2752            for field in input.schema().fields() {
2753                if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2754                    cols.iter_mut().find(|(name, _)| name == field.name())
2755                {
2756                    if !loose_types && *data_type != field.data_type() {
2757                        return plan_err!(
2758                            "Found different types for field {}",
2759                            field.name()
2760                        );
2761                    }
2762
2763                    metadata.push(field.metadata());
2764                    // If the field is nullable in any one of the inputs,
2765                    // then the field in the final schema is also nullable.
2766                    *is_nullable |= field.is_nullable();
2767                    *occurrences += 1;
2768                } else {
2769                    cols.push((
2770                        field.name(),
2771                        (
2772                            field.data_type(),
2773                            field.is_nullable(),
2774                            vec![field.metadata()],
2775                            1,
2776                        ),
2777                    ));
2778                }
2779            }
2780        }
2781
2782        let union_fields = cols
2783            .into_iter()
2784            .map(
2785                |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2786                    // If the final number of occurrences of the field is less
2787                    // than the number of inputs (i.e. the field is missing from
2788                    // one or more inputs), then it must be treated as nullable.
2789                    let final_is_nullable = if occurrences == inputs.len() {
2790                        is_nullable
2791                    } else {
2792                        true
2793                    };
2794
2795                    let mut field =
2796                        Field::new(name, data_type.clone(), final_is_nullable);
2797                    field.set_metadata(intersect_maps(unmerged_metadata));
2798
2799                    (None, Arc::new(field))
2800                },
2801            )
2802            .collect::<Vec<(Option<TableReference>, _)>>();
2803
2804        let union_schema_metadata =
2805            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2806
2807        // Functional Dependencies are not preserved after UNION operation
2808        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2809        let schema = Arc::new(schema);
2810
2811        Ok(schema)
2812    }
2813
2814    fn derive_schema_from_inputs_by_position(
2815        inputs: &[Arc<LogicalPlan>],
2816        loose_types: bool,
2817    ) -> Result<DFSchemaRef> {
2818        let first_schema = inputs[0].schema();
2819        let fields_count = first_schema.fields().len();
2820        for input in inputs.iter().skip(1) {
2821            if fields_count != input.schema().fields().len() {
2822                return plan_err!(
2823                    "UNION queries have different number of columns: \
2824                    left has {} columns whereas right has {} columns",
2825                    fields_count,
2826                    input.schema().fields().len()
2827                );
2828            }
2829        }
2830
2831        let mut name_counts: HashMap<String, usize> = HashMap::new();
2832        let union_fields = (0..fields_count)
2833            .map(|i| {
2834                let fields = inputs
2835                    .iter()
2836                    .map(|input| input.schema().field(i))
2837                    .collect::<Vec<_>>();
2838                let first_field = fields[0];
2839                let base_name = first_field.name().to_string();
2840
2841                let data_type = if loose_types {
2842                    // TODO apply type coercion here, or document why it's better to defer
2843                    // temporarily use the data type from the left input and later rely on the analyzer to
2844                    // coerce the two schemas into a common one.
2845                    first_field.data_type()
2846                } else {
2847                    fields.iter().skip(1).try_fold(
2848                        first_field.data_type(),
2849                        |acc, field| {
2850                            if acc != field.data_type() {
2851                                return plan_err!(
2852                                    "UNION field {i} have different type in inputs: \
2853                                    left has {} whereas right has {}",
2854                                    first_field.data_type(),
2855                                    field.data_type()
2856                                );
2857                            }
2858                            Ok(acc)
2859                        },
2860                    )?
2861                };
2862                let nullable = fields.iter().any(|field| field.is_nullable());
2863
2864                // Generate unique field name
2865                let name = if let Some(count) = name_counts.get_mut(&base_name) {
2866                    *count += 1;
2867                    format!("{base_name}_{count}")
2868                } else {
2869                    name_counts.insert(base_name.clone(), 0);
2870                    base_name
2871                };
2872
2873                let mut field = Field::new(&name, data_type.clone(), nullable);
2874                let field_metadata =
2875                    intersect_maps(fields.iter().map(|field| field.metadata()));
2876                field.set_metadata(field_metadata);
2877                Ok((None, Arc::new(field)))
2878            })
2879            .collect::<Result<_>>()?;
2880        let union_schema_metadata =
2881            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2882
2883        // Functional Dependencies are not preserved after UNION operation
2884        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2885        let schema = Arc::new(schema);
2886
2887        Ok(schema)
2888    }
2889}
2890
2891fn intersect_maps<'a>(
2892    inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2893) -> HashMap<String, String> {
2894    let mut inputs = inputs.into_iter();
2895    let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2896    for input in inputs {
2897        // The extra dereference below (`&*v`) is a workaround for https://github.com/rkyv/rkyv/issues/434.
2898        // When this crate is used in a workspace that enables the `rkyv-64` feature in the `chrono` crate,
2899        // this triggers a Rust compilation error:
2900        // error[E0277]: can't compare `Option<&std::string::String>` with `Option<&mut std::string::String>`.
2901        merged.retain(|k, v| input.get(k) == Some(&*v));
2902    }
2903    merged
2904}
2905
2906// Manual implementation needed because of `schema` field. Comparison excludes this field.
2907impl PartialOrd for Union {
2908    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2909        self.inputs.partial_cmp(&other.inputs)
2910    }
2911}
2912
2913/// Describe the schema of table
2914///
2915/// # Example output:
2916///
2917/// ```sql
2918/// > describe traces;
2919/// +--------------------+-----------------------------+-------------+
2920/// | column_name        | data_type                   | is_nullable |
2921/// +--------------------+-----------------------------+-------------+
2922/// | attributes         | Utf8                        | YES         |
2923/// | duration_nano      | Int64                       | YES         |
2924/// | end_time_unix_nano | Int64                       | YES         |
2925/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
2926/// | span.kind          | Utf8                        | YES         |
2927/// | span.name          | Utf8                        | YES         |
2928/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
2929/// | time               | Timestamp(Nanosecond, None) | NO          |
2930/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
2931/// | otel.status_code   | Utf8                        | YES         |
2932/// | parent_span_id     | Utf8                        | YES         |
2933/// +--------------------+-----------------------------+-------------+
2934/// ```
2935#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2936pub struct DescribeTable {
2937    /// Table schema
2938    pub schema: Arc<Schema>,
2939    /// schema of describe table output
2940    pub output_schema: DFSchemaRef,
2941}
2942
2943// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
2944// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
2945impl PartialOrd for DescribeTable {
2946    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2947        // There is no relevant comparison for schemas
2948        None
2949    }
2950}
2951
2952/// Output formats for controlling for Explain plans
2953#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2954pub enum ExplainFormat {
2955    /// Indent mode
2956    ///
2957    /// Example:
2958    /// ```text
2959    /// > explain format indent select x from values (1) t(x);
2960    /// +---------------+-----------------------------------------------------+
2961    /// | plan_type     | plan                                                |
2962    /// +---------------+-----------------------------------------------------+
2963    /// | logical_plan  | SubqueryAlias: t                                    |
2964    /// |               |   Projection: column1 AS x                          |
2965    /// |               |     Values: (Int64(1))                              |
2966    /// | physical_plan | ProjectionExec: expr=[column1@0 as x]               |
2967    /// |               |   DataSourceExec: partitions=1, partition_sizes=[1] |
2968    /// |               |                                                     |
2969    /// +---------------+-----------------------------------------------------+
2970    /// ```
2971    Indent,
2972    /// Tree mode
2973    ///
2974    /// Example:
2975    /// ```text
2976    /// > explain format tree select x from values (1) t(x);
2977    /// +---------------+-------------------------------+
2978    /// | plan_type     | plan                          |
2979    /// +---------------+-------------------------------+
2980    /// | physical_plan | ┌───────────────────────────┐ |
2981    /// |               | │       ProjectionExec      │ |
2982    /// |               | │    --------------------   │ |
2983    /// |               | │        x: column1@0       │ |
2984    /// |               | └─────────────┬─────────────┘ |
2985    /// |               | ┌─────────────┴─────────────┐ |
2986    /// |               | │       DataSourceExec      │ |
2987    /// |               | │    --------------------   │ |
2988    /// |               | │         bytes: 128        │ |
2989    /// |               | │       format: memory      │ |
2990    /// |               | │          rows: 1          │ |
2991    /// |               | └───────────────────────────┘ |
2992    /// |               |                               |
2993    /// +---------------+-------------------------------+
2994    /// ```
2995    Tree,
2996    /// Postgres Json mode
2997    ///
2998    /// A displayable structure that produces plan in postgresql JSON format.
2999    ///
3000    /// Users can use this format to visualize the plan in existing plan
3001    /// visualization tools, for example [dalibo](https://explain.dalibo.com/)
3002    ///
3003    /// Example:
3004    /// ```text
3005    /// > explain format pgjson select x from values (1) t(x);
3006    /// +--------------+--------------------------------------+
3007    /// | plan_type    | plan                                 |
3008    /// +--------------+--------------------------------------+
3009    /// | logical_plan | [                                    |
3010    /// |              |   {                                  |
3011    /// |              |     "Plan": {                        |
3012    /// |              |       "Alias": "t",                  |
3013    /// |              |       "Node Type": "Subquery",       |
3014    /// |              |       "Output": [                    |
3015    /// |              |         "x"                          |
3016    /// |              |       ],                             |
3017    /// |              |       "Plans": [                     |
3018    /// |              |         {                            |
3019    /// |              |           "Expressions": [           |
3020    /// |              |             "column1 AS x"           |
3021    /// |              |           ],                         |
3022    /// |              |           "Node Type": "Projection", |
3023    /// |              |           "Output": [                |
3024    /// |              |             "x"                      |
3025    /// |              |           ],                         |
3026    /// |              |           "Plans": [                 |
3027    /// |              |             {                        |
3028    /// |              |               "Node Type": "Values", |
3029    /// |              |               "Output": [            |
3030    /// |              |                 "column1"            |
3031    /// |              |               ],                     |
3032    /// |              |               "Plans": [],           |
3033    /// |              |               "Values": "(Int64(1))" |
3034    /// |              |             }                        |
3035    /// |              |           ]                          |
3036    /// |              |         }                            |
3037    /// |              |       ]                              |
3038    /// |              |     }                                |
3039    /// |              |   }                                  |
3040    /// |              | ]                                    |
3041    /// +--------------+--------------------------------------+
3042    /// ```
3043    PostgresJSON,
3044    /// Graphviz mode
3045    ///
3046    /// Example:
3047    /// ```text
3048    /// > explain format graphviz select x from values (1) t(x);
3049    /// +--------------+------------------------------------------------------------------------+
3050    /// | plan_type    | plan                                                                   |
3051    /// +--------------+------------------------------------------------------------------------+
3052    /// | logical_plan |                                                                        |
3053    /// |              | // Begin DataFusion GraphViz Plan,                                     |
3054    /// |              | // display it online here: https://dreampuf.github.io/GraphvizOnline   |
3055    /// |              |                                                                        |
3056    /// |              | digraph {                                                              |
3057    /// |              |   subgraph cluster_1                                                   |
3058    /// |              |   {                                                                    |
3059    /// |              |     graph[label="LogicalPlan"]                                         |
3060    /// |              |     2[shape=box label="SubqueryAlias: t"]                              |
3061    /// |              |     3[shape=box label="Projection: column1 AS x"]                      |
3062    /// |              |     2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]                |
3063    /// |              |     4[shape=box label="Values: (Int64(1))"]                            |
3064    /// |              |     3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]                |
3065    /// |              |   }                                                                    |
3066    /// |              |   subgraph cluster_5                                                   |
3067    /// |              |   {                                                                    |
3068    /// |              |     graph[label="Detailed LogicalPlan"]                                |
3069    /// |              |     6[shape=box label="SubqueryAlias: t\nSchema: [x:Int64;N]"]         |
3070    /// |              |     7[shape=box label="Projection: column1 AS x\nSchema: [x:Int64;N]"] |
3071    /// |              |     6 -> 7 [arrowhead=none, arrowtail=normal, dir=back]                |
3072    /// |              |     8[shape=box label="Values: (Int64(1))\nSchema: [column1:Int64;N]"] |
3073    /// |              |     7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]                |
3074    /// |              |   }                                                                    |
3075    /// |              | }                                                                      |
3076    /// |              | // End DataFusion GraphViz Plan                                        |
3077    /// |              |                                                                        |
3078    /// +--------------+------------------------------------------------------------------------+
3079    /// ```
3080    Graphviz,
3081}
3082
3083/// Implement  parsing strings to `ExplainFormat`
3084impl FromStr for ExplainFormat {
3085    type Err = DataFusionError;
3086
3087    fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3088        match format.to_lowercase().as_str() {
3089            "indent" => Ok(ExplainFormat::Indent),
3090            "tree" => Ok(ExplainFormat::Tree),
3091            "pgjson" => Ok(ExplainFormat::PostgresJSON),
3092            "graphviz" => Ok(ExplainFormat::Graphviz),
3093            _ => {
3094                plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3095            }
3096        }
3097    }
3098}
3099
3100/// Options for EXPLAIN
3101#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3102pub struct ExplainOption {
3103    /// Include detailed debug info
3104    pub verbose: bool,
3105    /// Actually execute the plan and report metrics
3106    pub analyze: bool,
3107    /// Output syntax/format
3108    pub format: ExplainFormat,
3109}
3110
3111impl Default for ExplainOption {
3112    fn default() -> Self {
3113        ExplainOption {
3114            verbose: false,
3115            analyze: false,
3116            format: ExplainFormat::Indent,
3117        }
3118    }
3119}
3120
3121impl ExplainOption {
3122    /// Builder‐style setter for `verbose`
3123    pub fn with_verbose(mut self, verbose: bool) -> Self {
3124        self.verbose = verbose;
3125        self
3126    }
3127
3128    /// Builder‐style setter for `analyze`
3129    pub fn with_analyze(mut self, analyze: bool) -> Self {
3130        self.analyze = analyze;
3131        self
3132    }
3133
3134    /// Builder‐style setter for `format`
3135    pub fn with_format(mut self, format: ExplainFormat) -> Self {
3136        self.format = format;
3137        self
3138    }
3139}
3140
3141/// Produces a relation with string representations of
3142/// various parts of the plan
3143///
3144/// See [the documentation] for more information
3145///
3146/// [the documentation]: https://datafusion.apache.org/user-guide/sql/explain.html
3147#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3148pub struct Explain {
3149    /// Should extra (detailed, intermediate plans) be included?
3150    pub verbose: bool,
3151    /// Output format for explain, if specified.
3152    /// If none, defaults to `text`
3153    pub explain_format: ExplainFormat,
3154    /// The logical plan that is being EXPLAIN'd
3155    pub plan: Arc<LogicalPlan>,
3156    /// Represent the various stages plans have gone through
3157    pub stringified_plans: Vec<StringifiedPlan>,
3158    /// The output schema of the explain (2 columns of text)
3159    pub schema: DFSchemaRef,
3160    /// Used by physical planner to check if should proceed with planning
3161    pub logical_optimization_succeeded: bool,
3162}
3163
3164// Manual implementation needed because of `schema` field. Comparison excludes this field.
3165impl PartialOrd for Explain {
3166    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3167        #[derive(PartialEq, PartialOrd)]
3168        struct ComparableExplain<'a> {
3169            /// Should extra (detailed, intermediate plans) be included?
3170            pub verbose: &'a bool,
3171            /// The logical plan that is being EXPLAIN'd
3172            pub plan: &'a Arc<LogicalPlan>,
3173            /// Represent the various stages plans have gone through
3174            pub stringified_plans: &'a Vec<StringifiedPlan>,
3175            /// Used by physical planner to check if should proceed with planning
3176            pub logical_optimization_succeeded: &'a bool,
3177        }
3178        let comparable_self = ComparableExplain {
3179            verbose: &self.verbose,
3180            plan: &self.plan,
3181            stringified_plans: &self.stringified_plans,
3182            logical_optimization_succeeded: &self.logical_optimization_succeeded,
3183        };
3184        let comparable_other = ComparableExplain {
3185            verbose: &other.verbose,
3186            plan: &other.plan,
3187            stringified_plans: &other.stringified_plans,
3188            logical_optimization_succeeded: &other.logical_optimization_succeeded,
3189        };
3190        comparable_self.partial_cmp(&comparable_other)
3191    }
3192}
3193
3194/// Runs the actual plan, and then prints the physical plan with
3195/// with execution metrics.
3196#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3197pub struct Analyze {
3198    /// Should extra detail be included?
3199    pub verbose: bool,
3200    /// The logical plan that is being EXPLAIN ANALYZE'd
3201    pub input: Arc<LogicalPlan>,
3202    /// The output schema of the explain (2 columns of text)
3203    pub schema: DFSchemaRef,
3204}
3205
3206// Manual implementation needed because of `schema` field. Comparison excludes this field.
3207impl PartialOrd for Analyze {
3208    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3209        match self.verbose.partial_cmp(&other.verbose) {
3210            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3211            cmp => cmp,
3212        }
3213    }
3214}
3215
3216/// Extension operator defined outside of DataFusion
3217// TODO(clippy): This clippy `allow` should be removed if
3218// the manual `PartialEq` is removed in favor of a derive.
3219// (see `PartialEq` the impl for details.)
3220#[allow(clippy::derived_hash_with_manual_eq)]
3221#[derive(Debug, Clone, Eq, Hash)]
3222pub struct Extension {
3223    /// The runtime extension operator
3224    pub node: Arc<dyn UserDefinedLogicalNode>,
3225}
3226
3227// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3228// This manual implementation should be removed if
3229// https://github.com/rust-lang/rust/issues/39128 is fixed.
3230impl PartialEq for Extension {
3231    fn eq(&self, other: &Self) -> bool {
3232        self.node.eq(&other.node)
3233    }
3234}
3235
3236impl PartialOrd for Extension {
3237    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3238        self.node.partial_cmp(&other.node)
3239    }
3240}
3241
3242/// Produces the first `n` tuples from its input and discards the rest.
3243#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3244pub struct Limit {
3245    /// Number of rows to skip before fetch
3246    pub skip: Option<Box<Expr>>,
3247    /// Maximum number of rows to fetch,
3248    /// None means fetching all rows
3249    pub fetch: Option<Box<Expr>>,
3250    /// The logical plan
3251    pub input: Arc<LogicalPlan>,
3252}
3253
3254/// Different types of skip expression in Limit plan.
3255pub enum SkipType {
3256    /// The skip expression is a literal value.
3257    Literal(usize),
3258    /// Currently only supports expressions that can be folded into constants.
3259    UnsupportedExpr,
3260}
3261
3262/// Different types of fetch expression in Limit plan.
3263pub enum FetchType {
3264    /// The fetch expression is a literal value.
3265    /// `Literal(None)` means the fetch expression is not provided.
3266    Literal(Option<usize>),
3267    /// Currently only supports expressions that can be folded into constants.
3268    UnsupportedExpr,
3269}
3270
3271impl Limit {
3272    /// Get the skip type from the limit plan.
3273    pub fn get_skip_type(&self) -> Result<SkipType> {
3274        match self.skip.as_deref() {
3275            Some(expr) => match *expr {
3276                Expr::Literal(ScalarValue::Int64(s), _) => {
3277                    // `skip = NULL` is equivalent to `skip = 0`
3278                    let s = s.unwrap_or(0);
3279                    if s >= 0 {
3280                        Ok(SkipType::Literal(s as usize))
3281                    } else {
3282                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3283                    }
3284                }
3285                _ => Ok(SkipType::UnsupportedExpr),
3286            },
3287            // `skip = None` is equivalent to `skip = 0`
3288            None => Ok(SkipType::Literal(0)),
3289        }
3290    }
3291
3292    /// Get the fetch type from the limit plan.
3293    pub fn get_fetch_type(&self) -> Result<FetchType> {
3294        match self.fetch.as_deref() {
3295            Some(expr) => match *expr {
3296                Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3297                    if s >= 0 {
3298                        Ok(FetchType::Literal(Some(s as usize)))
3299                    } else {
3300                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3301                    }
3302                }
3303                Expr::Literal(ScalarValue::Int64(None), _) => {
3304                    Ok(FetchType::Literal(None))
3305                }
3306                _ => Ok(FetchType::UnsupportedExpr),
3307            },
3308            None => Ok(FetchType::Literal(None)),
3309        }
3310    }
3311}
3312
3313/// Removes duplicate rows from the input
3314#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3315pub enum Distinct {
3316    /// Plain `DISTINCT` referencing all selection expressions
3317    All(Arc<LogicalPlan>),
3318    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3319    On(DistinctOn),
3320}
3321
3322impl Distinct {
3323    /// return a reference to the nodes input
3324    pub fn input(&self) -> &Arc<LogicalPlan> {
3325        match self {
3326            Distinct::All(input) => input,
3327            Distinct::On(DistinctOn { input, .. }) => input,
3328        }
3329    }
3330}
3331
3332/// Removes duplicate rows from the input
3333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3334pub struct DistinctOn {
3335    /// The `DISTINCT ON` clause expression list
3336    pub on_expr: Vec<Expr>,
3337    /// The selected projection expression list
3338    pub select_expr: Vec<Expr>,
3339    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3340    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3341    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3342    pub sort_expr: Option<Vec<SortExpr>>,
3343    /// The logical plan that is being DISTINCT'd
3344    pub input: Arc<LogicalPlan>,
3345    /// The schema description of the DISTINCT ON output
3346    pub schema: DFSchemaRef,
3347}
3348
3349impl DistinctOn {
3350    /// Create a new `DistinctOn` struct.
3351    pub fn try_new(
3352        on_expr: Vec<Expr>,
3353        select_expr: Vec<Expr>,
3354        sort_expr: Option<Vec<SortExpr>>,
3355        input: Arc<LogicalPlan>,
3356    ) -> Result<Self> {
3357        if on_expr.is_empty() {
3358            return plan_err!("No `ON` expressions provided");
3359        }
3360
3361        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3362        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3363            .into_iter()
3364            .collect();
3365
3366        let dfschema = DFSchema::new_with_metadata(
3367            qualified_fields,
3368            input.schema().metadata().clone(),
3369        )?;
3370
3371        let mut distinct_on = DistinctOn {
3372            on_expr,
3373            select_expr,
3374            sort_expr: None,
3375            input,
3376            schema: Arc::new(dfschema),
3377        };
3378
3379        if let Some(sort_expr) = sort_expr {
3380            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3381        }
3382
3383        Ok(distinct_on)
3384    }
3385
3386    /// Try to update `self` with a new sort expressions.
3387    ///
3388    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3389    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3390        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3391
3392        // Check that the left-most sort expressions are the same as the `ON` expressions.
3393        let mut matched = true;
3394        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3395            if on != &sort.expr {
3396                matched = false;
3397                break;
3398            }
3399        }
3400
3401        if self.on_expr.len() > sort_expr.len() || !matched {
3402            return plan_err!(
3403                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3404            );
3405        }
3406
3407        self.sort_expr = Some(sort_expr);
3408        Ok(self)
3409    }
3410}
3411
3412// Manual implementation needed because of `schema` field. Comparison excludes this field.
3413impl PartialOrd for DistinctOn {
3414    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3415        #[derive(PartialEq, PartialOrd)]
3416        struct ComparableDistinctOn<'a> {
3417            /// The `DISTINCT ON` clause expression list
3418            pub on_expr: &'a Vec<Expr>,
3419            /// The selected projection expression list
3420            pub select_expr: &'a Vec<Expr>,
3421            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3422            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3423            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3424            pub sort_expr: &'a Option<Vec<SortExpr>>,
3425            /// The logical plan that is being DISTINCT'd
3426            pub input: &'a Arc<LogicalPlan>,
3427        }
3428        let comparable_self = ComparableDistinctOn {
3429            on_expr: &self.on_expr,
3430            select_expr: &self.select_expr,
3431            sort_expr: &self.sort_expr,
3432            input: &self.input,
3433        };
3434        let comparable_other = ComparableDistinctOn {
3435            on_expr: &other.on_expr,
3436            select_expr: &other.select_expr,
3437            sort_expr: &other.sort_expr,
3438            input: &other.input,
3439        };
3440        comparable_self.partial_cmp(&comparable_other)
3441    }
3442}
3443
3444/// Aggregates its input based on a set of grouping and aggregate
3445/// expressions (e.g. SUM).
3446///
3447/// # Output Schema
3448///
3449/// The output schema is the group expressions followed by the aggregate
3450/// expressions in order.
3451///
3452/// For example, given the input schema `"A", "B", "C"` and the aggregate
3453/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3454/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3455/// single new column
3456#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3457// mark non_exhaustive to encourage use of try_new/new()
3458#[non_exhaustive]
3459pub struct Aggregate {
3460    /// The incoming logical plan
3461    pub input: Arc<LogicalPlan>,
3462    /// Grouping expressions
3463    pub group_expr: Vec<Expr>,
3464    /// Aggregate expressions
3465    pub aggr_expr: Vec<Expr>,
3466    /// The schema description of the aggregate output
3467    pub schema: DFSchemaRef,
3468}
3469
3470impl Aggregate {
3471    /// Create a new aggregate operator.
3472    pub fn try_new(
3473        input: Arc<LogicalPlan>,
3474        group_expr: Vec<Expr>,
3475        aggr_expr: Vec<Expr>,
3476    ) -> Result<Self> {
3477        let group_expr = enumerate_grouping_sets(group_expr)?;
3478
3479        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3480
3481        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3482
3483        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3484
3485        // Even columns that cannot be null will become nullable when used in a grouping set.
3486        if is_grouping_set {
3487            qualified_fields = qualified_fields
3488                .into_iter()
3489                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3490                .collect::<Vec<_>>();
3491            qualified_fields.push((
3492                None,
3493                Field::new(
3494                    Self::INTERNAL_GROUPING_ID,
3495                    Self::grouping_id_type(qualified_fields.len()),
3496                    false,
3497                )
3498                .into(),
3499            ));
3500        }
3501
3502        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3503
3504        let schema = DFSchema::new_with_metadata(
3505            qualified_fields,
3506            input.schema().metadata().clone(),
3507        )?;
3508
3509        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3510    }
3511
3512    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3513    /// building the schema again when the schema is already known.
3514    ///
3515    /// This method should only be called when you are absolutely sure that the schema being
3516    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3517    pub fn try_new_with_schema(
3518        input: Arc<LogicalPlan>,
3519        group_expr: Vec<Expr>,
3520        aggr_expr: Vec<Expr>,
3521        schema: DFSchemaRef,
3522    ) -> Result<Self> {
3523        if group_expr.is_empty() && aggr_expr.is_empty() {
3524            return plan_err!(
3525                "Aggregate requires at least one grouping or aggregate expression"
3526            );
3527        }
3528        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3529        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3530            return plan_err!(
3531                "Aggregate schema has wrong number of fields. Expected {} got {}",
3532                group_expr_count + aggr_expr.len(),
3533                schema.fields().len()
3534            );
3535        }
3536
3537        let aggregate_func_dependencies =
3538            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3539        let new_schema = schema.as_ref().clone();
3540        let schema = Arc::new(
3541            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3542        );
3543        Ok(Self {
3544            input,
3545            group_expr,
3546            aggr_expr,
3547            schema,
3548        })
3549    }
3550
3551    fn is_grouping_set(&self) -> bool {
3552        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3553    }
3554
3555    /// Get the output expressions.
3556    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3557        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3558            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3559        });
3560        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3561        if self.is_grouping_set() {
3562            exprs.push(&INTERNAL_ID_EXPR);
3563        }
3564        exprs.extend(self.aggr_expr.iter());
3565        debug_assert!(exprs.len() == self.schema.fields().len());
3566        Ok(exprs)
3567    }
3568
3569    /// Get the length of the group by expression in the output schema
3570    /// This is not simply group by expression length. Expression may be
3571    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3572    pub fn group_expr_len(&self) -> Result<usize> {
3573        grouping_set_expr_count(&self.group_expr)
3574    }
3575
3576    /// Returns the data type of the grouping id.
3577    /// The grouping ID value is a bitmask where each set bit
3578    /// indicates that the corresponding grouping expression is
3579    /// null
3580    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3581        if group_exprs <= 8 {
3582            DataType::UInt8
3583        } else if group_exprs <= 16 {
3584            DataType::UInt16
3585        } else if group_exprs <= 32 {
3586            DataType::UInt32
3587        } else {
3588            DataType::UInt64
3589        }
3590    }
3591
3592    /// Internal column used when the aggregation is a grouping set.
3593    ///
3594    /// This column contains a bitmask where each bit represents a grouping
3595    /// expression. The least significant bit corresponds to the rightmost
3596    /// grouping expression. A bit value of 0 indicates that the corresponding
3597    /// column is included in the grouping set, while a value of 1 means it is excluded.
3598    ///
3599    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3600    /// column will have the following values:
3601    ///     0b00: Both `a` and `b` are included
3602    ///     0b01: `b` is excluded
3603    ///     0b10: `a` is excluded
3604    ///     0b11: Both `a` and `b` are excluded
3605    ///
3606    /// This internal column is necessary because excluded columns are replaced
3607    /// with `NULL` values. To handle these cases correctly, we must distinguish
3608    /// between an actual `NULL` value in a column and a column being excluded from the set.
3609    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3610}
3611
3612// Manual implementation needed because of `schema` field. Comparison excludes this field.
3613impl PartialOrd for Aggregate {
3614    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3615        match self.input.partial_cmp(&other.input) {
3616            Some(Ordering::Equal) => {
3617                match self.group_expr.partial_cmp(&other.group_expr) {
3618                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3619                    cmp => cmp,
3620                }
3621            }
3622            cmp => cmp,
3623        }
3624    }
3625}
3626
3627/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3628fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3629    group_expr
3630        .iter()
3631        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3632}
3633
3634/// Calculates functional dependencies for aggregate expressions.
3635fn calc_func_dependencies_for_aggregate(
3636    // Expressions in the GROUP BY clause:
3637    group_expr: &[Expr],
3638    // Input plan of the aggregate:
3639    input: &LogicalPlan,
3640    // Aggregate schema
3641    aggr_schema: &DFSchema,
3642) -> Result<FunctionalDependencies> {
3643    // We can do a case analysis on how to propagate functional dependencies based on
3644    // whether the GROUP BY in question contains a grouping set expression:
3645    // - If so, the functional dependencies will be empty because we cannot guarantee
3646    //   that GROUP BY expression results will be unique.
3647    // - Otherwise, it may be possible to propagate functional dependencies.
3648    if !contains_grouping_set(group_expr) {
3649        let group_by_expr_names = group_expr
3650            .iter()
3651            .map(|item| item.schema_name().to_string())
3652            .collect::<IndexSet<_>>()
3653            .into_iter()
3654            .collect::<Vec<_>>();
3655        let aggregate_func_dependencies = aggregate_functional_dependencies(
3656            input.schema(),
3657            &group_by_expr_names,
3658            aggr_schema,
3659        );
3660        Ok(aggregate_func_dependencies)
3661    } else {
3662        Ok(FunctionalDependencies::empty())
3663    }
3664}
3665
3666/// This function projects functional dependencies of the `input` plan according
3667/// to projection expressions `exprs`.
3668fn calc_func_dependencies_for_project(
3669    exprs: &[Expr],
3670    input: &LogicalPlan,
3671) -> Result<FunctionalDependencies> {
3672    let input_fields = input.schema().field_names();
3673    // Calculate expression indices (if present) in the input schema.
3674    let proj_indices = exprs
3675        .iter()
3676        .map(|expr| match expr {
3677            #[expect(deprecated)]
3678            Expr::Wildcard { qualifier, options } => {
3679                let wildcard_fields = exprlist_to_fields(
3680                    vec![&Expr::Wildcard {
3681                        qualifier: qualifier.clone(),
3682                        options: options.clone(),
3683                    }],
3684                    input,
3685                )?;
3686                Ok::<_, DataFusionError>(
3687                    wildcard_fields
3688                        .into_iter()
3689                        .filter_map(|(qualifier, f)| {
3690                            let flat_name = qualifier
3691                                .map(|t| format!("{}.{}", t, f.name()))
3692                                .unwrap_or_else(|| f.name().clone());
3693                            input_fields.iter().position(|item| *item == flat_name)
3694                        })
3695                        .collect::<Vec<_>>(),
3696                )
3697            }
3698            Expr::Alias(alias) => {
3699                let name = format!("{}", alias.expr);
3700                Ok(input_fields
3701                    .iter()
3702                    .position(|item| *item == name)
3703                    .map(|i| vec![i])
3704                    .unwrap_or(vec![]))
3705            }
3706            _ => {
3707                let name = format!("{expr}");
3708                Ok(input_fields
3709                    .iter()
3710                    .position(|item| *item == name)
3711                    .map(|i| vec![i])
3712                    .unwrap_or(vec![]))
3713            }
3714        })
3715        .collect::<Result<Vec<_>>>()?
3716        .into_iter()
3717        .flatten()
3718        .collect::<Vec<_>>();
3719
3720    Ok(input
3721        .schema()
3722        .functional_dependencies()
3723        .project_functional_dependencies(&proj_indices, exprs.len()))
3724}
3725
3726/// Sorts its input according to a list of sort expressions.
3727#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3728pub struct Sort {
3729    /// The sort expressions
3730    pub expr: Vec<SortExpr>,
3731    /// The incoming logical plan
3732    pub input: Arc<LogicalPlan>,
3733    /// Optional fetch limit
3734    pub fetch: Option<usize>,
3735}
3736
3737/// Join two logical plans on one or more join columns
3738#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3739pub struct Join {
3740    /// Left input
3741    pub left: Arc<LogicalPlan>,
3742    /// Right input
3743    pub right: Arc<LogicalPlan>,
3744    /// Equijoin clause expressed as pairs of (left, right) join expressions
3745    pub on: Vec<(Expr, Expr)>,
3746    /// Filters applied during join (non-equi conditions)
3747    pub filter: Option<Expr>,
3748    /// Join type
3749    pub join_type: JoinType,
3750    /// Join constraint
3751    pub join_constraint: JoinConstraint,
3752    /// The output schema, containing fields from the left and right inputs
3753    pub schema: DFSchemaRef,
3754    /// Defines the null equality for the join.
3755    pub null_equality: NullEquality,
3756}
3757
3758impl Join {
3759    /// Creates a new Join operator with automatically computed schema.
3760    ///
3761    /// This constructor computes the schema based on the join type and inputs,
3762    /// removing the need to manually specify the schema or call `recompute_schema`.
3763    ///
3764    /// # Arguments
3765    ///
3766    /// * `left` - Left input plan
3767    /// * `right` - Right input plan
3768    /// * `on` - Join condition as a vector of (left_expr, right_expr) pairs
3769    /// * `filter` - Optional filter expression (for non-equijoin conditions)
3770    /// * `join_type` - Type of join (Inner, Left, Right, etc.)
3771    /// * `join_constraint` - Join constraint (On, Using)
3772    /// * `null_equality` - How to handle nulls in join comparisons
3773    ///
3774    /// # Returns
3775    ///
3776    /// A new Join operator with the computed schema
3777    pub fn try_new(
3778        left: Arc<LogicalPlan>,
3779        right: Arc<LogicalPlan>,
3780        on: Vec<(Expr, Expr)>,
3781        filter: Option<Expr>,
3782        join_type: JoinType,
3783        join_constraint: JoinConstraint,
3784        null_equality: NullEquality,
3785    ) -> Result<Self> {
3786        let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3787
3788        Ok(Join {
3789            left,
3790            right,
3791            on,
3792            filter,
3793            join_type,
3794            join_constraint,
3795            schema: Arc::new(join_schema),
3796            null_equality,
3797        })
3798    }
3799
3800    /// Create Join with input which wrapped with projection, this method is used in physcial planning only to help
3801    /// create the physical join.
3802    pub fn try_new_with_project_input(
3803        original: &LogicalPlan,
3804        left: Arc<LogicalPlan>,
3805        right: Arc<LogicalPlan>,
3806        column_on: (Vec<Column>, Vec<Column>),
3807    ) -> Result<(Self, bool)> {
3808        let original_join = match original {
3809            LogicalPlan::Join(join) => join,
3810            _ => return plan_err!("Could not create join with project input"),
3811        };
3812
3813        let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3814        let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3815
3816        let mut requalified = false;
3817
3818        // By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3819        // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3820        if original_join.join_type == JoinType::Inner
3821            || original_join.join_type == JoinType::Left
3822            || original_join.join_type == JoinType::Right
3823            || original_join.join_type == JoinType::Full
3824        {
3825            (left_sch, right_sch, requalified) =
3826                requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3827        }
3828
3829        let on: Vec<(Expr, Expr)> = column_on
3830            .0
3831            .into_iter()
3832            .zip(column_on.1)
3833            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3834            .collect();
3835
3836        let join_schema = build_join_schema(
3837            left_sch.schema(),
3838            right_sch.schema(),
3839            &original_join.join_type,
3840        )?;
3841
3842        Ok((
3843            Join {
3844                left,
3845                right,
3846                on,
3847                filter: original_join.filter.clone(),
3848                join_type: original_join.join_type,
3849                join_constraint: original_join.join_constraint,
3850                schema: Arc::new(join_schema),
3851                null_equality: original_join.null_equality,
3852            },
3853            requalified,
3854        ))
3855    }
3856}
3857
3858// Manual implementation needed because of `schema` field. Comparison excludes this field.
3859impl PartialOrd for Join {
3860    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3861        #[derive(PartialEq, PartialOrd)]
3862        struct ComparableJoin<'a> {
3863            /// Left input
3864            pub left: &'a Arc<LogicalPlan>,
3865            /// Right input
3866            pub right: &'a Arc<LogicalPlan>,
3867            /// Equijoin clause expressed as pairs of (left, right) join expressions
3868            pub on: &'a Vec<(Expr, Expr)>,
3869            /// Filters applied during join (non-equi conditions)
3870            pub filter: &'a Option<Expr>,
3871            /// Join type
3872            pub join_type: &'a JoinType,
3873            /// Join constraint
3874            pub join_constraint: &'a JoinConstraint,
3875            /// The null handling behavior for equalities
3876            pub null_equality: &'a NullEquality,
3877        }
3878        let comparable_self = ComparableJoin {
3879            left: &self.left,
3880            right: &self.right,
3881            on: &self.on,
3882            filter: &self.filter,
3883            join_type: &self.join_type,
3884            join_constraint: &self.join_constraint,
3885            null_equality: &self.null_equality,
3886        };
3887        let comparable_other = ComparableJoin {
3888            left: &other.left,
3889            right: &other.right,
3890            on: &other.on,
3891            filter: &other.filter,
3892            join_type: &other.join_type,
3893            join_constraint: &other.join_constraint,
3894            null_equality: &other.null_equality,
3895        };
3896        comparable_self.partial_cmp(&comparable_other)
3897    }
3898}
3899
3900/// Subquery
3901#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3902pub struct Subquery {
3903    /// The subquery
3904    pub subquery: Arc<LogicalPlan>,
3905    /// The outer references used in the subquery
3906    pub outer_ref_columns: Vec<Expr>,
3907    /// Span information for subquery projection columns
3908    pub spans: Spans,
3909}
3910
3911impl Normalizeable for Subquery {
3912    fn can_normalize(&self) -> bool {
3913        false
3914    }
3915}
3916
3917impl NormalizeEq for Subquery {
3918    fn normalize_eq(&self, other: &Self) -> bool {
3919        // TODO: may be implement NormalizeEq for LogicalPlan?
3920        *self.subquery == *other.subquery
3921            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3922            && self
3923                .outer_ref_columns
3924                .iter()
3925                .zip(other.outer_ref_columns.iter())
3926                .all(|(a, b)| a.normalize_eq(b))
3927    }
3928}
3929
3930impl Subquery {
3931    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3932        match plan {
3933            Expr::ScalarSubquery(it) => Ok(it),
3934            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3935            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3936        }
3937    }
3938
3939    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3940        Subquery {
3941            subquery: plan,
3942            outer_ref_columns: self.outer_ref_columns.clone(),
3943            spans: Spans::new(),
3944        }
3945    }
3946}
3947
3948impl Debug for Subquery {
3949    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3950        write!(f, "<subquery>")
3951    }
3952}
3953
3954/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
3955///
3956/// See [`Partitioning`] for more details on partitioning
3957///
3958/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
3959#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3960pub enum Partitioning {
3961    /// Allocate batches using a round-robin algorithm and the specified number of partitions
3962    RoundRobinBatch(usize),
3963    /// Allocate rows based on a hash of one of more expressions and the specified number
3964    /// of partitions.
3965    Hash(Vec<Expr>, usize),
3966    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
3967    DistributeBy(Vec<Expr>),
3968}
3969
3970/// Represent the unnesting operation on a list column, such as the recursion depth and
3971/// the output column name after unnesting
3972///
3973/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
3974///
3975/// ```text
3976///   input             output_name
3977///  ┌─────────┐      ┌─────────┐
3978///  │{{1,2}}  │      │ 1       │
3979///  ├─────────┼─────►├─────────┤
3980///  │{{3}}    │      │ 2       │
3981///  ├─────────┤      ├─────────┤
3982///  │{{4},{5}}│      │ 3       │
3983///  └─────────┘      ├─────────┤
3984///                   │ 4       │
3985///                   ├─────────┤
3986///                   │ 5       │
3987///                   └─────────┘
3988/// ```
3989#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3990pub struct ColumnUnnestList {
3991    pub output_column: Column,
3992    pub depth: usize,
3993}
3994
3995impl Display for ColumnUnnestList {
3996    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3997        write!(f, "{}|depth={}", self.output_column, self.depth)
3998    }
3999}
4000
4001/// Unnest a column that contains a nested list type. See
4002/// [`UnnestOptions`] for more details.
4003#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4004pub struct Unnest {
4005    /// The incoming logical plan
4006    pub input: Arc<LogicalPlan>,
4007    /// Columns to run unnest on, can be a list of (List/Struct) columns
4008    pub exec_columns: Vec<Column>,
4009    /// refer to the indices(in the input schema) of columns
4010    /// that have type list to run unnest on
4011    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4012    /// refer to the indices (in the input schema) of columns
4013    /// that have type struct to run unnest on
4014    pub struct_type_columns: Vec<usize>,
4015    /// Having items aligned with the output columns
4016    /// representing which column in the input schema each output column depends on
4017    pub dependency_indices: Vec<usize>,
4018    /// The output schema, containing the unnested field column.
4019    pub schema: DFSchemaRef,
4020    /// Options
4021    pub options: UnnestOptions,
4022}
4023
4024// Manual implementation needed because of `schema` field. Comparison excludes this field.
4025impl PartialOrd for Unnest {
4026    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4027        #[derive(PartialEq, PartialOrd)]
4028        struct ComparableUnnest<'a> {
4029            /// The incoming logical plan
4030            pub input: &'a Arc<LogicalPlan>,
4031            /// Columns to run unnest on, can be a list of (List/Struct) columns
4032            pub exec_columns: &'a Vec<Column>,
4033            /// refer to the indices(in the input schema) of columns
4034            /// that have type list to run unnest on
4035            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4036            /// refer to the indices (in the input schema) of columns
4037            /// that have type struct to run unnest on
4038            pub struct_type_columns: &'a Vec<usize>,
4039            /// Having items aligned with the output columns
4040            /// representing which column in the input schema each output column depends on
4041            pub dependency_indices: &'a Vec<usize>,
4042            /// Options
4043            pub options: &'a UnnestOptions,
4044        }
4045        let comparable_self = ComparableUnnest {
4046            input: &self.input,
4047            exec_columns: &self.exec_columns,
4048            list_type_columns: &self.list_type_columns,
4049            struct_type_columns: &self.struct_type_columns,
4050            dependency_indices: &self.dependency_indices,
4051            options: &self.options,
4052        };
4053        let comparable_other = ComparableUnnest {
4054            input: &other.input,
4055            exec_columns: &other.exec_columns,
4056            list_type_columns: &other.list_type_columns,
4057            struct_type_columns: &other.struct_type_columns,
4058            dependency_indices: &other.dependency_indices,
4059            options: &other.options,
4060        };
4061        comparable_self.partial_cmp(&comparable_other)
4062    }
4063}
4064
4065impl Unnest {
4066    pub fn try_new(
4067        input: Arc<LogicalPlan>,
4068        exec_columns: Vec<Column>,
4069        options: UnnestOptions,
4070    ) -> Result<Self> {
4071        if exec_columns.is_empty() {
4072            return plan_err!("unnest plan requires at least 1 column to unnest");
4073        }
4074
4075        let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4076        let mut struct_columns = vec![];
4077        let indices_to_unnest = exec_columns
4078            .iter()
4079            .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4080            .collect::<Result<HashMap<usize, &Column>>>()?;
4081
4082        let input_schema = input.schema();
4083
4084        let mut dependency_indices = vec![];
4085        // Transform input schema into new schema
4086        // Given this comprehensive example
4087        //
4088        // input schema:
4089        // 1.col1_unnest_placeholder: list[list[int]],
4090        // 2.col1: list[list[int]]
4091        // 3.col2: list[int]
4092        // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
4093        // output schema:
4094        // 1.unnest_col1_depth_2: int
4095        // 2.unnest_col1_depth_1: list[int]
4096        // 3.col1: list[list[int]]
4097        // 4.unnest_col2_depth_1: int
4098        // Meaning the placeholder column will be replaced by its unnested variation(s), note
4099        // the plural.
4100        let fields = input_schema
4101            .iter()
4102            .enumerate()
4103            .map(|(index, (original_qualifier, original_field))| {
4104                match indices_to_unnest.get(&index) {
4105                    Some(column_to_unnest) => {
4106                        let recursions_on_column = options
4107                            .recursions
4108                            .iter()
4109                            .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4110                            .collect::<Vec<_>>();
4111                        let mut transformed_columns = recursions_on_column
4112                            .iter()
4113                            .map(|r| {
4114                                list_columns.push((
4115                                    index,
4116                                    ColumnUnnestList {
4117                                        output_column: r.output_column.clone(),
4118                                        depth: r.depth,
4119                                    },
4120                                ));
4121                                Ok(get_unnested_columns(
4122                                    &r.output_column.name,
4123                                    original_field.data_type(),
4124                                    r.depth,
4125                                )?
4126                                .into_iter()
4127                                .next()
4128                                .unwrap()) // because unnesting a list column always result into one result
4129                            })
4130                            .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4131                        if transformed_columns.is_empty() {
4132                            transformed_columns = get_unnested_columns(
4133                                &column_to_unnest.name,
4134                                original_field.data_type(),
4135                                1,
4136                            )?;
4137                            match original_field.data_type() {
4138                                DataType::Struct(_) => {
4139                                    struct_columns.push(index);
4140                                }
4141                                DataType::List(_)
4142                                | DataType::FixedSizeList(_, _)
4143                                | DataType::LargeList(_) => {
4144                                    list_columns.push((
4145                                        index,
4146                                        ColumnUnnestList {
4147                                            output_column: Column::from_name(
4148                                                &column_to_unnest.name,
4149                                            ),
4150                                            depth: 1,
4151                                        },
4152                                    ));
4153                                }
4154                                _ => {}
4155                            };
4156                        }
4157
4158                        // new columns dependent on the same original index
4159                        dependency_indices.extend(std::iter::repeat_n(
4160                            index,
4161                            transformed_columns.len(),
4162                        ));
4163                        Ok(transformed_columns
4164                            .iter()
4165                            .map(|(col, field)| {
4166                                (col.relation.to_owned(), field.to_owned())
4167                            })
4168                            .collect())
4169                    }
4170                    None => {
4171                        dependency_indices.push(index);
4172                        Ok(vec![(
4173                            original_qualifier.cloned(),
4174                            Arc::clone(original_field),
4175                        )])
4176                    }
4177                }
4178            })
4179            .collect::<Result<Vec<_>>>()?
4180            .into_iter()
4181            .flatten()
4182            .collect::<Vec<_>>();
4183
4184        let metadata = input_schema.metadata().clone();
4185        let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4186        // We can use the existing functional dependencies:
4187        let deps = input_schema.functional_dependencies().clone();
4188        let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4189
4190        Ok(Unnest {
4191            input,
4192            exec_columns,
4193            list_type_columns: list_columns,
4194            struct_type_columns: struct_columns,
4195            dependency_indices,
4196            schema,
4197            options,
4198        })
4199    }
4200}
4201
4202// Based on data type, either struct or a variant of list
4203// return a set of columns as the result of unnesting
4204// the input columns.
4205// For example, given a column with name "a",
4206// - List(Element) returns ["a"] with data type Element
4207// - Struct(field1, field2) returns ["a.field1","a.field2"]
4208// For list data type, an argument depth is used to specify
4209// the recursion level
4210fn get_unnested_columns(
4211    col_name: &String,
4212    data_type: &DataType,
4213    depth: usize,
4214) -> Result<Vec<(Column, Arc<Field>)>> {
4215    let mut qualified_columns = Vec::with_capacity(1);
4216
4217    match data_type {
4218        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4219            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4220            let new_field = Arc::new(Field::new(
4221                col_name, data_type,
4222                // Unnesting may produce NULLs even if the list is not null.
4223                // For example: unnest([1], []) -> 1, null
4224                true,
4225            ));
4226            let column = Column::from_name(col_name);
4227            // let column = Column::from((None, &new_field));
4228            qualified_columns.push((column, new_field));
4229        }
4230        DataType::Struct(fields) => {
4231            qualified_columns.extend(fields.iter().map(|f| {
4232                let new_name = format!("{}.{}", col_name, f.name());
4233                let column = Column::from_name(&new_name);
4234                let new_field = f.as_ref().clone().with_name(new_name);
4235                // let column = Column::from((None, &f));
4236                (column, Arc::new(new_field))
4237            }))
4238        }
4239        _ => {
4240            return internal_err!(
4241                "trying to unnest on invalid data type {:?}",
4242                data_type
4243            );
4244        }
4245    };
4246    Ok(qualified_columns)
4247}
4248
4249// Get the data type of a multi-dimensional type after unnesting it
4250// with a given depth
4251fn get_unnested_list_datatype_recursive(
4252    data_type: &DataType,
4253    depth: usize,
4254) -> Result<DataType> {
4255    match data_type {
4256        DataType::List(field)
4257        | DataType::FixedSizeList(field, _)
4258        | DataType::LargeList(field) => {
4259            if depth == 1 {
4260                return Ok(field.data_type().clone());
4261            }
4262            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4263        }
4264        _ => {}
4265    };
4266
4267    internal_err!("trying to unnest on invalid data type {:?}", data_type)
4268}
4269
4270#[cfg(test)]
4271mod tests {
4272
4273    use super::*;
4274    use crate::builder::LogicalTableSource;
4275    use crate::logical_plan::table_scan;
4276    use crate::{
4277        binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4278        GroupingSet,
4279    };
4280
4281    use datafusion_common::tree_node::{
4282        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4283    };
4284    use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4285    use insta::{assert_debug_snapshot, assert_snapshot};
4286
4287    use crate::test::function_stub::count;
4288
4289    fn employee_schema() -> Schema {
4290        Schema::new(vec![
4291            Field::new("id", DataType::Int32, false),
4292            Field::new("first_name", DataType::Utf8, false),
4293            Field::new("last_name", DataType::Utf8, false),
4294            Field::new("state", DataType::Utf8, false),
4295            Field::new("salary", DataType::Int32, false),
4296        ])
4297    }
4298
4299    fn display_plan() -> Result<LogicalPlan> {
4300        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4301            .build()?;
4302
4303        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4304            .filter(in_subquery(col("state"), Arc::new(plan1)))?
4305            .project(vec![col("id")])?
4306            .build()
4307    }
4308
4309    #[test]
4310    fn test_display_indent() -> Result<()> {
4311        let plan = display_plan()?;
4312
4313        assert_snapshot!(plan.display_indent(), @r"
4314        Projection: employee_csv.id
4315          Filter: employee_csv.state IN (<subquery>)
4316            Subquery:
4317              TableScan: employee_csv projection=[state]
4318            TableScan: employee_csv projection=[id, state]
4319        ");
4320        Ok(())
4321    }
4322
4323    #[test]
4324    fn test_display_indent_schema() -> Result<()> {
4325        let plan = display_plan()?;
4326
4327        assert_snapshot!(plan.display_indent_schema(), @r"
4328        Projection: employee_csv.id [id:Int32]
4329          Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4330            Subquery: [state:Utf8]
4331              TableScan: employee_csv projection=[state] [state:Utf8]
4332            TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4333        ");
4334        Ok(())
4335    }
4336
4337    #[test]
4338    fn test_display_subquery_alias() -> Result<()> {
4339        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4340            .build()?;
4341        let plan1 = Arc::new(plan1);
4342
4343        let plan =
4344            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4345                .project(vec![col("id"), exists(plan1).alias("exists")])?
4346                .build();
4347
4348        assert_snapshot!(plan?.display_indent(), @r"
4349        Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4350          Subquery:
4351            TableScan: employee_csv projection=[state]
4352          TableScan: employee_csv projection=[id, state]
4353        ");
4354        Ok(())
4355    }
4356
4357    #[test]
4358    fn test_display_graphviz() -> Result<()> {
4359        let plan = display_plan()?;
4360
4361        // just test for a few key lines in the output rather than the
4362        // whole thing to make test maintenance easier.
4363        assert_snapshot!(plan.display_graphviz(), @r#"
4364        // Begin DataFusion GraphViz Plan,
4365        // display it online here: https://dreampuf.github.io/GraphvizOnline
4366
4367        digraph {
4368          subgraph cluster_1
4369          {
4370            graph[label="LogicalPlan"]
4371            2[shape=box label="Projection: employee_csv.id"]
4372            3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4373            2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4374            4[shape=box label="Subquery:"]
4375            3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4376            5[shape=box label="TableScan: employee_csv projection=[state]"]
4377            4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4378            6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4379            3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4380          }
4381          subgraph cluster_7
4382          {
4383            graph[label="Detailed LogicalPlan"]
4384            8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4385            9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4386            8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4387            10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4388            9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4389            11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4390            10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4391            12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4392            9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4393          }
4394        }
4395        // End DataFusion GraphViz Plan
4396        "#);
4397        Ok(())
4398    }
4399
4400    #[test]
4401    fn test_display_pg_json() -> Result<()> {
4402        let plan = display_plan()?;
4403
4404        assert_snapshot!(plan.display_pg_json(), @r#"
4405        [
4406          {
4407            "Plan": {
4408              "Expressions": [
4409                "employee_csv.id"
4410              ],
4411              "Node Type": "Projection",
4412              "Output": [
4413                "id"
4414              ],
4415              "Plans": [
4416                {
4417                  "Condition": "employee_csv.state IN (<subquery>)",
4418                  "Node Type": "Filter",
4419                  "Output": [
4420                    "id",
4421                    "state"
4422                  ],
4423                  "Plans": [
4424                    {
4425                      "Node Type": "Subquery",
4426                      "Output": [
4427                        "state"
4428                      ],
4429                      "Plans": [
4430                        {
4431                          "Node Type": "TableScan",
4432                          "Output": [
4433                            "state"
4434                          ],
4435                          "Plans": [],
4436                          "Relation Name": "employee_csv"
4437                        }
4438                      ]
4439                    },
4440                    {
4441                      "Node Type": "TableScan",
4442                      "Output": [
4443                        "id",
4444                        "state"
4445                      ],
4446                      "Plans": [],
4447                      "Relation Name": "employee_csv"
4448                    }
4449                  ]
4450                }
4451              ]
4452            }
4453          }
4454        ]
4455        "#);
4456        Ok(())
4457    }
4458
4459    /// Tests for the Visitor trait and walking logical plan nodes
4460    #[derive(Debug, Default)]
4461    struct OkVisitor {
4462        strings: Vec<String>,
4463    }
4464
4465    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4466        type Node = LogicalPlan;
4467
4468        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4469            let s = match plan {
4470                LogicalPlan::Projection { .. } => "pre_visit Projection",
4471                LogicalPlan::Filter { .. } => "pre_visit Filter",
4472                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4473                _ => {
4474                    return not_impl_err!("unknown plan type");
4475                }
4476            };
4477
4478            self.strings.push(s.into());
4479            Ok(TreeNodeRecursion::Continue)
4480        }
4481
4482        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4483            let s = match plan {
4484                LogicalPlan::Projection { .. } => "post_visit Projection",
4485                LogicalPlan::Filter { .. } => "post_visit Filter",
4486                LogicalPlan::TableScan { .. } => "post_visit TableScan",
4487                _ => {
4488                    return not_impl_err!("unknown plan type");
4489                }
4490            };
4491
4492            self.strings.push(s.into());
4493            Ok(TreeNodeRecursion::Continue)
4494        }
4495    }
4496
4497    #[test]
4498    fn visit_order() {
4499        let mut visitor = OkVisitor::default();
4500        let plan = test_plan();
4501        let res = plan.visit_with_subqueries(&mut visitor);
4502        assert!(res.is_ok());
4503
4504        assert_debug_snapshot!(visitor.strings, @r#"
4505        [
4506            "pre_visit Projection",
4507            "pre_visit Filter",
4508            "pre_visit TableScan",
4509            "post_visit TableScan",
4510            "post_visit Filter",
4511            "post_visit Projection",
4512        ]
4513        "#);
4514    }
4515
4516    #[derive(Debug, Default)]
4517    /// Counter than counts to zero and returns true when it gets there
4518    struct OptionalCounter {
4519        val: Option<usize>,
4520    }
4521
4522    impl OptionalCounter {
4523        fn new(val: usize) -> Self {
4524            Self { val: Some(val) }
4525        }
4526        // Decrements the counter by 1, if any, returning true if it hits zero
4527        fn dec(&mut self) -> bool {
4528            if Some(0) == self.val {
4529                true
4530            } else {
4531                self.val = self.val.take().map(|i| i - 1);
4532                false
4533            }
4534        }
4535    }
4536
4537    #[derive(Debug, Default)]
4538    /// Visitor that returns false after some number of visits
4539    struct StoppingVisitor {
4540        inner: OkVisitor,
4541        /// When Some(0) returns false from pre_visit
4542        return_false_from_pre_in: OptionalCounter,
4543        /// When Some(0) returns false from post_visit
4544        return_false_from_post_in: OptionalCounter,
4545    }
4546
4547    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4548        type Node = LogicalPlan;
4549
4550        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4551            if self.return_false_from_pre_in.dec() {
4552                return Ok(TreeNodeRecursion::Stop);
4553            }
4554            self.inner.f_down(plan)?;
4555
4556            Ok(TreeNodeRecursion::Continue)
4557        }
4558
4559        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4560            if self.return_false_from_post_in.dec() {
4561                return Ok(TreeNodeRecursion::Stop);
4562            }
4563
4564            self.inner.f_up(plan)
4565        }
4566    }
4567
4568    /// test early stopping in pre-visit
4569    #[test]
4570    fn early_stopping_pre_visit() {
4571        let mut visitor = StoppingVisitor {
4572            return_false_from_pre_in: OptionalCounter::new(2),
4573            ..Default::default()
4574        };
4575        let plan = test_plan();
4576        let res = plan.visit_with_subqueries(&mut visitor);
4577        assert!(res.is_ok());
4578
4579        assert_debug_snapshot!(
4580            visitor.inner.strings,
4581            @r#"
4582        [
4583            "pre_visit Projection",
4584            "pre_visit Filter",
4585        ]
4586        "#
4587        );
4588    }
4589
4590    #[test]
4591    fn early_stopping_post_visit() {
4592        let mut visitor = StoppingVisitor {
4593            return_false_from_post_in: OptionalCounter::new(1),
4594            ..Default::default()
4595        };
4596        let plan = test_plan();
4597        let res = plan.visit_with_subqueries(&mut visitor);
4598        assert!(res.is_ok());
4599
4600        assert_debug_snapshot!(
4601            visitor.inner.strings,
4602            @r#"
4603        [
4604            "pre_visit Projection",
4605            "pre_visit Filter",
4606            "pre_visit TableScan",
4607            "post_visit TableScan",
4608        ]
4609        "#
4610        );
4611    }
4612
4613    #[derive(Debug, Default)]
4614    /// Visitor that returns an error after some number of visits
4615    struct ErrorVisitor {
4616        inner: OkVisitor,
4617        /// When Some(0) returns false from pre_visit
4618        return_error_from_pre_in: OptionalCounter,
4619        /// When Some(0) returns false from post_visit
4620        return_error_from_post_in: OptionalCounter,
4621    }
4622
4623    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4624        type Node = LogicalPlan;
4625
4626        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4627            if self.return_error_from_pre_in.dec() {
4628                return not_impl_err!("Error in pre_visit");
4629            }
4630
4631            self.inner.f_down(plan)
4632        }
4633
4634        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4635            if self.return_error_from_post_in.dec() {
4636                return not_impl_err!("Error in post_visit");
4637            }
4638
4639            self.inner.f_up(plan)
4640        }
4641    }
4642
4643    #[test]
4644    fn error_pre_visit() {
4645        let mut visitor = ErrorVisitor {
4646            return_error_from_pre_in: OptionalCounter::new(2),
4647            ..Default::default()
4648        };
4649        let plan = test_plan();
4650        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4651        assert_snapshot!(
4652            res.strip_backtrace(),
4653            @"This feature is not implemented: Error in pre_visit"
4654        );
4655        assert_debug_snapshot!(
4656            visitor.inner.strings,
4657            @r#"
4658        [
4659            "pre_visit Projection",
4660            "pre_visit Filter",
4661        ]
4662        "#
4663        );
4664    }
4665
4666    #[test]
4667    fn error_post_visit() {
4668        let mut visitor = ErrorVisitor {
4669            return_error_from_post_in: OptionalCounter::new(1),
4670            ..Default::default()
4671        };
4672        let plan = test_plan();
4673        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4674        assert_snapshot!(
4675            res.strip_backtrace(),
4676            @"This feature is not implemented: Error in post_visit"
4677        );
4678        assert_debug_snapshot!(
4679            visitor.inner.strings,
4680            @r#"
4681        [
4682            "pre_visit Projection",
4683            "pre_visit Filter",
4684            "pre_visit TableScan",
4685            "post_visit TableScan",
4686        ]
4687        "#
4688        );
4689    }
4690
4691    #[test]
4692    fn projection_expr_schema_mismatch() -> Result<()> {
4693        let empty_schema = Arc::new(DFSchema::empty());
4694        let p = Projection::try_new_with_schema(
4695            vec![col("a")],
4696            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4697                produce_one_row: false,
4698                schema: Arc::clone(&empty_schema),
4699            })),
4700            empty_schema,
4701        );
4702        assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4703        Ok(())
4704    }
4705
4706    fn test_plan() -> LogicalPlan {
4707        let schema = Schema::new(vec![
4708            Field::new("id", DataType::Int32, false),
4709            Field::new("state", DataType::Utf8, false),
4710        ]);
4711
4712        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4713            .unwrap()
4714            .filter(col("state").eq(lit("CO")))
4715            .unwrap()
4716            .project(vec![col("id")])
4717            .unwrap()
4718            .build()
4719            .unwrap()
4720    }
4721
4722    #[test]
4723    fn test_replace_invalid_placeholder() {
4724        // test empty placeholder
4725        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4726
4727        let plan = table_scan(TableReference::none(), &schema, None)
4728            .unwrap()
4729            .filter(col("id").eq(placeholder("")))
4730            .unwrap()
4731            .build()
4732            .unwrap();
4733
4734        let param_values = vec![ScalarValue::Int32(Some(42))];
4735        plan.replace_params_with_values(&param_values.clone().into())
4736            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4737
4738        // test $0 placeholder
4739        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4740
4741        let plan = table_scan(TableReference::none(), &schema, None)
4742            .unwrap()
4743            .filter(col("id").eq(placeholder("$0")))
4744            .unwrap()
4745            .build()
4746            .unwrap();
4747
4748        plan.replace_params_with_values(&param_values.clone().into())
4749            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4750
4751        // test $00 placeholder
4752        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4753
4754        let plan = table_scan(TableReference::none(), &schema, None)
4755            .unwrap()
4756            .filter(col("id").eq(placeholder("$00")))
4757            .unwrap()
4758            .build()
4759            .unwrap();
4760
4761        plan.replace_params_with_values(&param_values.into())
4762            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4763    }
4764
4765    #[test]
4766    fn test_nullable_schema_after_grouping_set() {
4767        let schema = Schema::new(vec![
4768            Field::new("foo", DataType::Int32, false),
4769            Field::new("bar", DataType::Int32, false),
4770        ]);
4771
4772        let plan = table_scan(TableReference::none(), &schema, None)
4773            .unwrap()
4774            .aggregate(
4775                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4776                    vec![col("foo")],
4777                    vec![col("bar")],
4778                ]))],
4779                vec![count(lit(true))],
4780            )
4781            .unwrap()
4782            .build()
4783            .unwrap();
4784
4785        let output_schema = plan.schema();
4786
4787        assert!(output_schema
4788            .field_with_name(None, "foo")
4789            .unwrap()
4790            .is_nullable(),);
4791        assert!(output_schema
4792            .field_with_name(None, "bar")
4793            .unwrap()
4794            .is_nullable());
4795    }
4796
4797    #[test]
4798    fn test_filter_is_scalar() {
4799        // test empty placeholder
4800        let schema =
4801            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4802
4803        let source = Arc::new(LogicalTableSource::new(schema));
4804        let schema = Arc::new(
4805            DFSchema::try_from_qualified_schema(
4806                TableReference::bare("tab"),
4807                &source.schema(),
4808            )
4809            .unwrap(),
4810        );
4811        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4812            table_name: TableReference::bare("tab"),
4813            source: Arc::clone(&source) as Arc<dyn TableSource>,
4814            projection: None,
4815            projected_schema: Arc::clone(&schema),
4816            filters: vec![],
4817            fetch: None,
4818        }));
4819        let col = schema.field_names()[0].clone();
4820
4821        let filter = Filter::try_new(
4822            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4823            scan,
4824        )
4825        .unwrap();
4826        assert!(!filter.is_scalar());
4827        let unique_schema = Arc::new(
4828            schema
4829                .as_ref()
4830                .clone()
4831                .with_functional_dependencies(
4832                    FunctionalDependencies::new_from_constraints(
4833                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4834                            vec![0],
4835                        )])),
4836                        1,
4837                    ),
4838                )
4839                .unwrap(),
4840        );
4841        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4842            table_name: TableReference::bare("tab"),
4843            source,
4844            projection: None,
4845            projected_schema: Arc::clone(&unique_schema),
4846            filters: vec![],
4847            fetch: None,
4848        }));
4849        let col = schema.field_names()[0].clone();
4850
4851        let filter =
4852            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4853        assert!(filter.is_scalar());
4854    }
4855
4856    #[test]
4857    fn test_transform_explain() {
4858        let schema = Schema::new(vec![
4859            Field::new("foo", DataType::Int32, false),
4860            Field::new("bar", DataType::Int32, false),
4861        ]);
4862
4863        let plan = table_scan(TableReference::none(), &schema, None)
4864            .unwrap()
4865            .explain(false, false)
4866            .unwrap()
4867            .build()
4868            .unwrap();
4869
4870        let external_filter = col("foo").eq(lit(true));
4871
4872        // after transformation, because plan is not the same anymore,
4873        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
4874        let plan = plan
4875            .transform(|plan| match plan {
4876                LogicalPlan::TableScan(table) => {
4877                    let filter = Filter::try_new(
4878                        external_filter.clone(),
4879                        Arc::new(LogicalPlan::TableScan(table)),
4880                    )
4881                    .unwrap();
4882                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4883                }
4884                x => Ok(Transformed::no(x)),
4885            })
4886            .data()
4887            .unwrap();
4888
4889        let actual = format!("{}", plan.display_indent());
4890        assert_snapshot!(actual, @r"
4891        Explain
4892          Filter: foo = Boolean(true)
4893            TableScan: ?table?
4894        ")
4895    }
4896
4897    #[test]
4898    fn test_plan_partial_ord() {
4899        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4900            produce_one_row: false,
4901            schema: Arc::new(DFSchema::empty()),
4902        });
4903
4904        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4905            schema: Arc::new(Schema::new(vec![Field::new(
4906                "foo",
4907                DataType::Int32,
4908                false,
4909            )])),
4910            output_schema: DFSchemaRef::new(DFSchema::empty()),
4911        });
4912
4913        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4914            schema: Arc::new(Schema::new(vec![Field::new(
4915                "foo",
4916                DataType::Int32,
4917                false,
4918            )])),
4919            output_schema: DFSchemaRef::new(DFSchema::empty()),
4920        });
4921
4922        assert_eq!(
4923            empty_relation.partial_cmp(&describe_table),
4924            Some(Ordering::Less)
4925        );
4926        assert_eq!(
4927            describe_table.partial_cmp(&empty_relation),
4928            Some(Ordering::Greater)
4929        );
4930        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4931    }
4932
4933    #[test]
4934    fn test_limit_with_new_children() {
4935        let input = Arc::new(LogicalPlan::Values(Values {
4936            schema: Arc::new(DFSchema::empty()),
4937            values: vec![vec![]],
4938        }));
4939        let cases = [
4940            LogicalPlan::Limit(Limit {
4941                skip: None,
4942                fetch: None,
4943                input: Arc::clone(&input),
4944            }),
4945            LogicalPlan::Limit(Limit {
4946                skip: None,
4947                fetch: Some(Box::new(Expr::Literal(
4948                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4949                    None,
4950                ))),
4951                input: Arc::clone(&input),
4952            }),
4953            LogicalPlan::Limit(Limit {
4954                skip: Some(Box::new(Expr::Literal(
4955                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4956                    None,
4957                ))),
4958                fetch: None,
4959                input: Arc::clone(&input),
4960            }),
4961            LogicalPlan::Limit(Limit {
4962                skip: Some(Box::new(Expr::Literal(
4963                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
4964                    None,
4965                ))),
4966                fetch: Some(Box::new(Expr::Literal(
4967                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4968                    None,
4969                ))),
4970                input,
4971            }),
4972        ];
4973
4974        for limit in cases {
4975            let new_limit = limit
4976                .with_new_exprs(
4977                    limit.expressions(),
4978                    limit.inputs().into_iter().cloned().collect(),
4979                )
4980                .unwrap();
4981            assert_eq!(limit, new_limit);
4982        }
4983    }
4984
4985    #[test]
4986    fn test_with_subqueries_jump() {
4987        // The test plan contains a `Project` node above a `Filter` node, and the
4988        // `Project` node contains a subquery plan with a `Filter` root node, so returning
4989        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
4990        // `Filter`s.
4991        let subquery_schema =
4992            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4993
4994        let subquery_plan =
4995            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4996                .unwrap()
4997                .filter(col("sub_id").eq(lit(0)))
4998                .unwrap()
4999                .build()
5000                .unwrap();
5001
5002        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5003
5004        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5005            .unwrap()
5006            .filter(col("id").eq(lit(0)))
5007            .unwrap()
5008            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5009            .unwrap()
5010            .build()
5011            .unwrap();
5012
5013        let mut filter_found = false;
5014        plan.apply_with_subqueries(|plan| {
5015            match plan {
5016                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5017                LogicalPlan::Filter(..) => filter_found = true,
5018                _ => {}
5019            }
5020            Ok(TreeNodeRecursion::Continue)
5021        })
5022        .unwrap();
5023        assert!(!filter_found);
5024
5025        struct ProjectJumpVisitor {
5026            filter_found: bool,
5027        }
5028
5029        impl ProjectJumpVisitor {
5030            fn new() -> Self {
5031                Self {
5032                    filter_found: false,
5033                }
5034            }
5035        }
5036
5037        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5038            type Node = LogicalPlan;
5039
5040            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5041                match node {
5042                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5043                    LogicalPlan::Filter(..) => self.filter_found = true,
5044                    _ => {}
5045                }
5046                Ok(TreeNodeRecursion::Continue)
5047            }
5048        }
5049
5050        let mut visitor = ProjectJumpVisitor::new();
5051        plan.visit_with_subqueries(&mut visitor).unwrap();
5052        assert!(!visitor.filter_found);
5053
5054        let mut filter_found = false;
5055        plan.clone()
5056            .transform_down_with_subqueries(|plan| {
5057                match plan {
5058                    LogicalPlan::Projection(..) => {
5059                        return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5060                    }
5061                    LogicalPlan::Filter(..) => filter_found = true,
5062                    _ => {}
5063                }
5064                Ok(Transformed::no(plan))
5065            })
5066            .unwrap();
5067        assert!(!filter_found);
5068
5069        let mut filter_found = false;
5070        plan.clone()
5071            .transform_down_up_with_subqueries(
5072                |plan| {
5073                    match plan {
5074                        LogicalPlan::Projection(..) => {
5075                            return Ok(Transformed::new(
5076                                plan,
5077                                false,
5078                                TreeNodeRecursion::Jump,
5079                            ))
5080                        }
5081                        LogicalPlan::Filter(..) => filter_found = true,
5082                        _ => {}
5083                    }
5084                    Ok(Transformed::no(plan))
5085                },
5086                |plan| Ok(Transformed::no(plan)),
5087            )
5088            .unwrap();
5089        assert!(!filter_found);
5090
5091        struct ProjectJumpRewriter {
5092            filter_found: bool,
5093        }
5094
5095        impl ProjectJumpRewriter {
5096            fn new() -> Self {
5097                Self {
5098                    filter_found: false,
5099                }
5100            }
5101        }
5102
5103        impl TreeNodeRewriter for ProjectJumpRewriter {
5104            type Node = LogicalPlan;
5105
5106            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5107                match node {
5108                    LogicalPlan::Projection(..) => {
5109                        return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5110                    }
5111                    LogicalPlan::Filter(..) => self.filter_found = true,
5112                    _ => {}
5113                }
5114                Ok(Transformed::no(node))
5115            }
5116        }
5117
5118        let mut rewriter = ProjectJumpRewriter::new();
5119        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5120        assert!(!rewriter.filter_found);
5121    }
5122
5123    #[test]
5124    fn test_with_unresolved_placeholders() {
5125        let field_name = "id";
5126        let placeholder_value = "$1";
5127        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5128
5129        let plan = table_scan(TableReference::none(), &schema, None)
5130            .unwrap()
5131            .filter(col(field_name).eq(placeholder(placeholder_value)))
5132            .unwrap()
5133            .build()
5134            .unwrap();
5135
5136        // Check that the placeholder parameters have not received a DataType.
5137        let params = plan.get_parameter_types().unwrap();
5138        assert_eq!(params.len(), 1);
5139
5140        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5141        assert_eq!(parameter_type, None);
5142    }
5143
5144    #[test]
5145    fn test_join_with_new_exprs() -> Result<()> {
5146        fn create_test_join(
5147            on: Vec<(Expr, Expr)>,
5148            filter: Option<Expr>,
5149        ) -> Result<LogicalPlan> {
5150            let schema = Schema::new(vec![
5151                Field::new("a", DataType::Int32, false),
5152                Field::new("b", DataType::Int32, false),
5153            ]);
5154
5155            let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5156            let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5157
5158            Ok(LogicalPlan::Join(Join {
5159                left: Arc::new(
5160                    table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5161                ),
5162                right: Arc::new(
5163                    table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5164                ),
5165                on,
5166                filter,
5167                join_type: JoinType::Inner,
5168                join_constraint: JoinConstraint::On,
5169                schema: Arc::new(left_schema.join(&right_schema)?),
5170                null_equality: NullEquality::NullEqualsNothing,
5171            }))
5172        }
5173
5174        {
5175            let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5176            let LogicalPlan::Join(join) = join.with_new_exprs(
5177                join.expressions(),
5178                join.inputs().into_iter().cloned().collect(),
5179            )?
5180            else {
5181                unreachable!()
5182            };
5183            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5184            assert_eq!(join.filter, None);
5185        }
5186
5187        {
5188            let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5189            let LogicalPlan::Join(join) = join.with_new_exprs(
5190                join.expressions(),
5191                join.inputs().into_iter().cloned().collect(),
5192            )?
5193            else {
5194                unreachable!()
5195            };
5196            assert_eq!(join.on, vec![]);
5197            assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5198        }
5199
5200        {
5201            let join = create_test_join(
5202                vec![(col("t1.a"), (col("t2.a")))],
5203                Some(col("t1.b").gt(col("t2.b"))),
5204            )?;
5205            let LogicalPlan::Join(join) = join.with_new_exprs(
5206                join.expressions(),
5207                join.inputs().into_iter().cloned().collect(),
5208            )?
5209            else {
5210                unreachable!()
5211            };
5212            assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5213            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5214        }
5215
5216        {
5217            let join = create_test_join(
5218                vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5219                None,
5220            )?;
5221            let LogicalPlan::Join(join) = join.with_new_exprs(
5222                vec![
5223                    binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5224                    binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5225                    col("t1.b"),
5226                    col("t2.b"),
5227                    lit(true),
5228                ],
5229                join.inputs().into_iter().cloned().collect(),
5230            )?
5231            else {
5232                unreachable!()
5233            };
5234            assert_eq!(
5235                join.on,
5236                vec![
5237                    (
5238                        binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5239                        binary_expr(col("t2.a"), Operator::Plus, lit(2))
5240                    ),
5241                    (col("t1.b"), (col("t2.b")))
5242                ]
5243            );
5244            assert_eq!(join.filter, Some(lit(true)));
5245        }
5246
5247        Ok(())
5248    }
5249
5250    #[test]
5251    fn test_join_try_new() -> Result<()> {
5252        let schema = Schema::new(vec![
5253            Field::new("a", DataType::Int32, false),
5254            Field::new("b", DataType::Int32, false),
5255        ]);
5256
5257        let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5258
5259        let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5260
5261        let join_types = vec![
5262            JoinType::Inner,
5263            JoinType::Left,
5264            JoinType::Right,
5265            JoinType::Full,
5266            JoinType::LeftSemi,
5267            JoinType::LeftAnti,
5268            JoinType::RightSemi,
5269            JoinType::RightAnti,
5270            JoinType::LeftMark,
5271        ];
5272
5273        for join_type in join_types {
5274            let join = Join::try_new(
5275                Arc::new(left_scan.clone()),
5276                Arc::new(right_scan.clone()),
5277                vec![(col("t1.a"), col("t2.a"))],
5278                Some(col("t1.b").gt(col("t2.b"))),
5279                join_type,
5280                JoinConstraint::On,
5281                NullEquality::NullEqualsNothing,
5282            )?;
5283
5284            match join_type {
5285                JoinType::LeftSemi | JoinType::LeftAnti => {
5286                    assert_eq!(join.schema.fields().len(), 2);
5287
5288                    let fields = join.schema.fields();
5289                    assert_eq!(
5290                        fields[0].name(),
5291                        "a",
5292                        "First field should be 'a' from left table"
5293                    );
5294                    assert_eq!(
5295                        fields[1].name(),
5296                        "b",
5297                        "Second field should be 'b' from left table"
5298                    );
5299                }
5300                JoinType::RightSemi | JoinType::RightAnti => {
5301                    assert_eq!(join.schema.fields().len(), 2);
5302
5303                    let fields = join.schema.fields();
5304                    assert_eq!(
5305                        fields[0].name(),
5306                        "a",
5307                        "First field should be 'a' from right table"
5308                    );
5309                    assert_eq!(
5310                        fields[1].name(),
5311                        "b",
5312                        "Second field should be 'b' from right table"
5313                    );
5314                }
5315                JoinType::LeftMark => {
5316                    assert_eq!(join.schema.fields().len(), 3);
5317
5318                    let fields = join.schema.fields();
5319                    assert_eq!(
5320                        fields[0].name(),
5321                        "a",
5322                        "First field should be 'a' from left table"
5323                    );
5324                    assert_eq!(
5325                        fields[1].name(),
5326                        "b",
5327                        "Second field should be 'b' from left table"
5328                    );
5329                    assert_eq!(
5330                        fields[2].name(),
5331                        "mark",
5332                        "Third field should be the mark column"
5333                    );
5334
5335                    assert!(!fields[0].is_nullable());
5336                    assert!(!fields[1].is_nullable());
5337                    assert!(!fields[2].is_nullable());
5338                }
5339                _ => {
5340                    assert_eq!(join.schema.fields().len(), 4);
5341
5342                    let fields = join.schema.fields();
5343                    assert_eq!(
5344                        fields[0].name(),
5345                        "a",
5346                        "First field should be 'a' from left table"
5347                    );
5348                    assert_eq!(
5349                        fields[1].name(),
5350                        "b",
5351                        "Second field should be 'b' from left table"
5352                    );
5353                    assert_eq!(
5354                        fields[2].name(),
5355                        "a",
5356                        "Third field should be 'a' from right table"
5357                    );
5358                    assert_eq!(
5359                        fields[3].name(),
5360                        "b",
5361                        "Fourth field should be 'b' from right table"
5362                    );
5363
5364                    if join_type == JoinType::Left {
5365                        // Left side fields (first two) shouldn't be nullable
5366                        assert!(!fields[0].is_nullable());
5367                        assert!(!fields[1].is_nullable());
5368                        // Right side fields (third and fourth) should be nullable
5369                        assert!(fields[2].is_nullable());
5370                        assert!(fields[3].is_nullable());
5371                    } else if join_type == JoinType::Right {
5372                        // Left side fields (first two) should be nullable
5373                        assert!(fields[0].is_nullable());
5374                        assert!(fields[1].is_nullable());
5375                        // Right side fields (third and fourth) shouldn't be nullable
5376                        assert!(!fields[2].is_nullable());
5377                        assert!(!fields[3].is_nullable());
5378                    } else if join_type == JoinType::Full {
5379                        assert!(fields[0].is_nullable());
5380                        assert!(fields[1].is_nullable());
5381                        assert!(fields[2].is_nullable());
5382                        assert!(fields[3].is_nullable());
5383                    }
5384                }
5385            }
5386
5387            assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5388            assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5389            assert_eq!(join.join_type, join_type);
5390            assert_eq!(join.join_constraint, JoinConstraint::On);
5391            assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5392        }
5393
5394        Ok(())
5395    }
5396
5397    #[test]
5398    fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5399        let left_schema = Schema::new(vec![
5400            Field::new("id", DataType::Int32, false), // Common column in both tables
5401            Field::new("name", DataType::Utf8, false), // Unique to left
5402            Field::new("value", DataType::Int32, false), // Common column, different meaning
5403        ]);
5404
5405        let right_schema = Schema::new(vec![
5406            Field::new("id", DataType::Int32, false), // Common column in both tables
5407            Field::new("category", DataType::Utf8, false), // Unique to right
5408            Field::new("value", DataType::Float64, true), // Common column, different meaning
5409        ]);
5410
5411        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5412
5413        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5414
5415        // Test 1: USING constraint with a common column
5416        {
5417            // In the logical plan, both copies of the `id` column are preserved
5418            // The USING constraint is handled later during physical execution, where the common column appears once
5419            let join = Join::try_new(
5420                Arc::new(left_plan.clone()),
5421                Arc::new(right_plan.clone()),
5422                vec![(col("t1.id"), col("t2.id"))],
5423                None,
5424                JoinType::Inner,
5425                JoinConstraint::Using,
5426                NullEquality::NullEqualsNothing,
5427            )?;
5428
5429            let fields = join.schema.fields();
5430
5431            assert_eq!(fields.len(), 6);
5432
5433            assert_eq!(
5434                fields[0].name(),
5435                "id",
5436                "First field should be 'id' from left table"
5437            );
5438            assert_eq!(
5439                fields[1].name(),
5440                "name",
5441                "Second field should be 'name' from left table"
5442            );
5443            assert_eq!(
5444                fields[2].name(),
5445                "value",
5446                "Third field should be 'value' from left table"
5447            );
5448            assert_eq!(
5449                fields[3].name(),
5450                "id",
5451                "Fourth field should be 'id' from right table"
5452            );
5453            assert_eq!(
5454                fields[4].name(),
5455                "category",
5456                "Fifth field should be 'category' from right table"
5457            );
5458            assert_eq!(
5459                fields[5].name(),
5460                "value",
5461                "Sixth field should be 'value' from right table"
5462            );
5463
5464            assert_eq!(join.join_constraint, JoinConstraint::Using);
5465        }
5466
5467        // Test 2: Complex join condition with expressions
5468        {
5469            // Complex condition: join on id equality AND where left.value < right.value
5470            let join = Join::try_new(
5471                Arc::new(left_plan.clone()),
5472                Arc::new(right_plan.clone()),
5473                vec![(col("t1.id"), col("t2.id"))], // Equijoin condition
5474                Some(col("t1.value").lt(col("t2.value"))), // Non-equi filter condition
5475                JoinType::Inner,
5476                JoinConstraint::On,
5477                NullEquality::NullEqualsNothing,
5478            )?;
5479
5480            let fields = join.schema.fields();
5481            assert_eq!(fields.len(), 6);
5482
5483            assert_eq!(
5484                fields[0].name(),
5485                "id",
5486                "First field should be 'id' from left table"
5487            );
5488            assert_eq!(
5489                fields[1].name(),
5490                "name",
5491                "Second field should be 'name' from left table"
5492            );
5493            assert_eq!(
5494                fields[2].name(),
5495                "value",
5496                "Third field should be 'value' from left table"
5497            );
5498            assert_eq!(
5499                fields[3].name(),
5500                "id",
5501                "Fourth field should be 'id' from right table"
5502            );
5503            assert_eq!(
5504                fields[4].name(),
5505                "category",
5506                "Fifth field should be 'category' from right table"
5507            );
5508            assert_eq!(
5509                fields[5].name(),
5510                "value",
5511                "Sixth field should be 'value' from right table"
5512            );
5513
5514            assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5515        }
5516
5517        // Test 3: Join with null equality behavior set to true
5518        {
5519            let join = Join::try_new(
5520                Arc::new(left_plan.clone()),
5521                Arc::new(right_plan.clone()),
5522                vec![(col("t1.id"), col("t2.id"))],
5523                None,
5524                JoinType::Inner,
5525                JoinConstraint::On,
5526                NullEquality::NullEqualsNull,
5527            )?;
5528
5529            assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5530        }
5531
5532        Ok(())
5533    }
5534
5535    #[test]
5536    fn test_join_try_new_schema_validation() -> Result<()> {
5537        let left_schema = Schema::new(vec![
5538            Field::new("id", DataType::Int32, false),
5539            Field::new("name", DataType::Utf8, false),
5540            Field::new("value", DataType::Float64, true),
5541        ]);
5542
5543        let right_schema = Schema::new(vec![
5544            Field::new("id", DataType::Int32, false),
5545            Field::new("category", DataType::Utf8, true),
5546            Field::new("code", DataType::Int16, false),
5547        ]);
5548
5549        let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5550
5551        let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5552
5553        let join_types = vec![
5554            JoinType::Inner,
5555            JoinType::Left,
5556            JoinType::Right,
5557            JoinType::Full,
5558        ];
5559
5560        for join_type in join_types {
5561            let join = Join::try_new(
5562                Arc::new(left_plan.clone()),
5563                Arc::new(right_plan.clone()),
5564                vec![(col("t1.id"), col("t2.id"))],
5565                Some(col("t1.value").gt(lit(5.0))),
5566                join_type,
5567                JoinConstraint::On,
5568                NullEquality::NullEqualsNothing,
5569            )?;
5570
5571            let fields = join.schema.fields();
5572            assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5573
5574            for (i, field) in fields.iter().enumerate() {
5575                let expected_nullable = match (i, &join_type) {
5576                    // Left table fields (indices 0, 1, 2)
5577                    (0, JoinType::Right | JoinType::Full) => true, // id becomes nullable in RIGHT/FULL
5578                    (1, JoinType::Right | JoinType::Full) => true, // name becomes nullable in RIGHT/FULL
5579                    (2, _) => true, // value is already nullable
5580
5581                    // Right table fields (indices 3, 4, 5)
5582                    (3, JoinType::Left | JoinType::Full) => true, // id becomes nullable in LEFT/FULL
5583                    (4, _) => true, // category is already nullable
5584                    (5, JoinType::Left | JoinType::Full) => true, // code becomes nullable in LEFT/FULL
5585
5586                    _ => false,
5587                };
5588
5589                assert_eq!(
5590                    field.is_nullable(),
5591                    expected_nullable,
5592                    "Field {} ({}) nullability incorrect for {:?} join",
5593                    i,
5594                    field.name(),
5595                    join_type
5596                );
5597            }
5598        }
5599
5600        let using_join = Join::try_new(
5601            Arc::new(left_plan.clone()),
5602            Arc::new(right_plan.clone()),
5603            vec![(col("t1.id"), col("t2.id"))],
5604            None,
5605            JoinType::Inner,
5606            JoinConstraint::Using,
5607            NullEquality::NullEqualsNothing,
5608        )?;
5609
5610        assert_eq!(
5611            using_join.schema.fields().len(),
5612            6,
5613            "USING join should have all fields"
5614        );
5615        assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5616
5617        Ok(())
5618    }
5619}