Skip to main content

datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::borrow::Cow;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29    coerce_plan_expr_for_schema, normalize_col,
30    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31    rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37    Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43    group_window_expr_by_sort_keys,
44};
45use crate::{
46    DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
47    Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
48};
49
50use super::dml::InsertOp;
51use arrow::compute::can_cast_types;
52use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
53use datafusion_common::display::ToStringifiedPlan;
54use datafusion_common::file_options::file_type::FileType;
55use datafusion_common::metadata::FieldMetadata;
56use datafusion_common::{
57    Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
58    TableReference, ToDFSchema, UnnestOptions, exec_err,
59    get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
60    plan_err,
61};
62use datafusion_expr_common::type_coercion::binary::type_union_resolution;
63
64use indexmap::IndexSet;
65
66/// Default table name for unnamed table
67pub const UNNAMED_TABLE: &str = "?table?";
68
69/// Options for [`LogicalPlanBuilder`]
70#[derive(Default, Debug, Clone)]
71pub struct LogicalPlanBuilderOptions {
72    /// Flag indicating whether the plan builder should add
73    /// functionally dependent expressions as additional aggregation groupings.
74    add_implicit_group_by_exprs: bool,
75}
76
77impl LogicalPlanBuilderOptions {
78    pub fn new() -> Self {
79        Default::default()
80    }
81
82    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
83    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
84        self.add_implicit_group_by_exprs = add;
85        self
86    }
87}
88
89/// Builder for logical plans
90///
91/// # Example building a simple plan
92/// ```
93/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
94/// # use datafusion_common::Result;
95/// # use arrow::datatypes::{Schema, DataType, Field};
96/// #
97/// # fn main() -> Result<()> {
98/// #
99/// # fn employee_schema() -> Schema {
100/// #    Schema::new(vec![
101/// #           Field::new("id", DataType::Int32, false),
102/// #           Field::new("first_name", DataType::Utf8, false),
103/// #           Field::new("last_name", DataType::Utf8, false),
104/// #           Field::new("state", DataType::Utf8, false),
105/// #           Field::new("salary", DataType::Int32, false),
106/// #       ])
107/// #   }
108/// #
109/// // Create a plan similar to
110/// // SELECT last_name
111/// // FROM employees
112/// // WHERE salary < 1000
113/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
114///  // Keep only rows where salary < 1000
115///  .filter(col("salary").lt(lit(1000)))?
116///  // only show "last_name" in the final results
117///  .project(vec![col("last_name")])?
118///  .build()?;
119///
120/// // Convert from plan back to builder
121/// let builder = LogicalPlanBuilder::from(plan);
122///
123/// # Ok(())
124/// # }
125/// ```
126#[derive(Debug, Clone)]
127pub struct LogicalPlanBuilder {
128    plan: Arc<LogicalPlan>,
129    options: LogicalPlanBuilderOptions,
130}
131
132impl LogicalPlanBuilder {
133    /// Create a builder from an existing plan
134    pub fn new(plan: LogicalPlan) -> Self {
135        Self {
136            plan: Arc::new(plan),
137            options: LogicalPlanBuilderOptions::default(),
138        }
139    }
140
141    /// Create a builder from an existing plan
142    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
143        Self {
144            plan,
145            options: LogicalPlanBuilderOptions::default(),
146        }
147    }
148
149    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
150        self.options = options;
151        self
152    }
153
154    /// Return the output schema of the plan build so far
155    pub fn schema(&self) -> &DFSchemaRef {
156        self.plan.schema()
157    }
158
159    /// Return the LogicalPlan of the plan build so far
160    pub fn plan(&self) -> &LogicalPlan {
161        &self.plan
162    }
163
164    /// Create an empty relation.
165    ///
166    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
167    pub fn empty(produce_one_row: bool) -> Self {
168        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
169            produce_one_row,
170            schema: DFSchemaRef::new(DFSchema::empty()),
171        }))
172    }
173
174    /// Convert a regular plan into a recursive query.
175    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
176    pub fn to_recursive_query(
177        self,
178        name: String,
179        recursive_term: LogicalPlan,
180        is_distinct: bool,
181    ) -> Result<Self> {
182        // Ensure that the static term and the recursive term have the same number of fields
183        let static_fields_len = self.plan.schema().fields().len();
184        let recursive_fields_len = recursive_term.schema().fields().len();
185        if static_fields_len != recursive_fields_len {
186            return plan_err!(
187                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
188                static_fields_len,
189                recursive_fields_len
190            );
191        }
192        // Ensure that the recursive term has the same field types as the static term
193        let coerced_recursive_term =
194            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
195        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
196            name,
197            static_term: self.plan,
198            recursive_term: Arc::new(coerced_recursive_term),
199            is_distinct,
200        })))
201    }
202
203    /// Create a values list based relation, and the schema is inferred from data, consuming
204    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
205    /// documentation for more details.
206    ///
207    /// so it's usually better to override the default names with a table alias list.
208    ///
209    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
210    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
211        if values.is_empty() {
212            return plan_err!("Values list cannot be empty");
213        }
214        let n_cols = values[0].len();
215        if n_cols == 0 {
216            return plan_err!("Values list cannot be zero length");
217        }
218        for (i, row) in values.iter().enumerate() {
219            if row.len() != n_cols {
220                return plan_err!(
221                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
222                    row.len(),
223                    i,
224                    n_cols
225                );
226            }
227        }
228
229        // Infer from data itself
230        Self::infer_data(values)
231    }
232
233    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
234    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
235    /// documentation for more details.
236    ///
237    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
238    /// The column names are not specified by the SQL standard and different database systems do it differently,
239    /// so it's usually better to override the default names with a table alias list.
240    ///
241    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
242    pub fn values_with_schema(
243        values: Vec<Vec<Expr>>,
244        schema: &DFSchemaRef,
245    ) -> Result<Self> {
246        if values.is_empty() {
247            return plan_err!("Values list cannot be empty");
248        }
249        let n_cols = schema.fields().len();
250        if n_cols == 0 {
251            return plan_err!("Values list cannot be zero length");
252        }
253        for (i, row) in values.iter().enumerate() {
254            if row.len() != n_cols {
255                return plan_err!(
256                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
257                    row.len(),
258                    i,
259                    n_cols
260                );
261            }
262        }
263
264        // Check the type of value against the schema
265        Self::infer_values_from_schema(values, schema)
266    }
267
268    fn infer_values_from_schema(
269        values: Vec<Vec<Expr>>,
270        schema: &DFSchema,
271    ) -> Result<Self> {
272        let n_cols = values[0].len();
273        let mut fields = ValuesFields::new();
274        for j in 0..n_cols {
275            let field_type = schema.field(j).data_type();
276            let field_nullable = schema.field(j).is_nullable();
277            for row in values.iter() {
278                let value = &row[j];
279                let data_type = value.get_type(schema)?;
280
281                if !data_type.equals_datatype(field_type)
282                    && !can_cast_types(&data_type, field_type)
283                {
284                    return exec_err!(
285                        "type mismatch and can't cast to got {} and {}",
286                        data_type,
287                        field_type
288                    );
289                }
290            }
291            fields.push(field_type.to_owned(), field_nullable);
292        }
293
294        Self::infer_inner(values, fields, schema)
295    }
296
297    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
298        let n_cols = values[0].len();
299        let schema = DFSchema::empty();
300        let mut fields = ValuesFields::new();
301
302        for j in 0..n_cols {
303            let mut common_type: Option<DataType> = None;
304            let mut common_metadata: Option<FieldMetadata> = None;
305            for (i, row) in values.iter().enumerate() {
306                let value = &row[j];
307                let metadata = value.metadata(&schema)?;
308                if let Some(ref cm) = common_metadata {
309                    if &metadata != cm {
310                        return plan_err!(
311                            "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
312                            cm,
313                            metadata
314                        );
315                    }
316                } else {
317                    common_metadata = Some(metadata.clone());
318                }
319                let data_type = value.get_type(&schema)?;
320                if data_type == DataType::Null {
321                    continue;
322                }
323
324                if let Some(prev_type) = common_type {
325                    // get common type of each column values.
326                    let data_types = vec![prev_type.clone(), data_type.clone()];
327                    let Some(new_type) = type_union_resolution(&data_types) else {
328                        return plan_err!(
329                            "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
330                        );
331                    };
332                    common_type = Some(new_type);
333                } else {
334                    common_type = Some(data_type);
335                }
336            }
337            // assuming common_type was not set, and no error, therefore the type should be NULL
338            // since the code loop skips NULL
339            fields.push_with_metadata(
340                common_type.unwrap_or(DataType::Null),
341                true,
342                common_metadata,
343            );
344        }
345
346        Self::infer_inner(values, fields, &schema)
347    }
348
349    fn infer_inner(
350        mut values: Vec<Vec<Expr>>,
351        fields: ValuesFields,
352        schema: &DFSchema,
353    ) -> Result<Self> {
354        let fields = fields.into_fields();
355        // wrap cast if data type is not same as common type.
356        for row in &mut values {
357            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
358                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
359                    row[j] = Expr::Literal(
360                        ScalarValue::try_from(field_type)?,
361                        metadata.clone(),
362                    );
363                } else {
364                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
365                }
366            }
367        }
368
369        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
370        let schema = DFSchemaRef::new(dfschema);
371
372        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
373    }
374
375    /// Convert a table provider into a builder with a TableScan
376    ///
377    /// Note that if you pass a string as `table_name`, it is treated
378    /// as a SQL identifier, as described on [`TableReference`] and
379    /// thus is normalized
380    ///
381    /// # Example:
382    /// ```
383    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
384    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
385    /// # };
386    /// # use std::sync::Arc;
387    /// # use arrow::datatypes::{Schema, DataType, Field};
388    /// # use datafusion_common::TableReference;
389    /// #
390    /// # let employee_schema = Arc::new(Schema::new(vec![
391    /// #           Field::new("id", DataType::Int32, false),
392    /// # ])) as _;
393    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
394    /// // Scan table_source with the name "mytable" (after normalization)
395    /// # let table = table_source.clone();
396    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
397    ///
398    /// // Scan table_source with the name "MyTable" by enclosing in quotes
399    /// # let table = table_source.clone();
400    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
401    ///
402    /// // Scan table_source with the name "MyTable" by forming the table reference
403    /// # let table = table_source.clone();
404    /// let table_reference = TableReference::bare("MyTable");
405    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
406    /// ```
407    pub fn scan(
408        table_name: impl Into<TableReference>,
409        table_source: Arc<dyn TableSource>,
410        projection: Option<Vec<usize>>,
411    ) -> Result<Self> {
412        Self::scan_with_filters(table_name, table_source, projection, vec![])
413    }
414
415    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
416    pub fn copy_to(
417        input: LogicalPlan,
418        output_url: String,
419        file_type: Arc<dyn FileType>,
420        options: HashMap<String, String>,
421        partition_by: Vec<String>,
422    ) -> Result<Self> {
423        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
424            Arc::new(input),
425            output_url,
426            partition_by,
427            file_type,
428            options,
429        ))))
430    }
431
432    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
433    ///
434    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
435    ///
436    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
437    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
438    ///
439    /// # Example:
440    /// ```
441    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
442    /// #  logical_plan::builder::LogicalTableSource,
443    /// # };
444    /// # use std::sync::Arc;
445    /// # use arrow::datatypes::{Schema, DataType, Field};
446    /// # use datafusion_expr::dml::InsertOp;
447    /// #
448    /// # fn test() -> datafusion_common::Result<()> {
449    /// # let employee_schema = Arc::new(Schema::new(vec![
450    /// #     Field::new("id", DataType::Int32, false),
451    /// # ])) as _;
452    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
453    /// // VALUES (1), (2)
454    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?.build()?;
455    /// // INSERT INTO MyTable VALUES (1), (2)
456    /// let insert_plan = LogicalPlanBuilder::insert_into(
457    ///     input,
458    ///     "MyTable",
459    ///     table_source,
460    ///     InsertOp::Append,
461    /// )?;
462    /// # Ok(())
463    /// # }
464    /// ```
465    pub fn insert_into(
466        input: LogicalPlan,
467        table_name: impl Into<TableReference>,
468        target: Arc<dyn TableSource>,
469        insert_op: InsertOp,
470    ) -> Result<Self> {
471        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
472            table_name.into(),
473            target,
474            WriteOp::Insert(insert_op),
475            Arc::new(input),
476        ))))
477    }
478
479    /// Convert a table provider into a builder with a TableScan
480    pub fn scan_with_filters(
481        table_name: impl Into<TableReference>,
482        table_source: Arc<dyn TableSource>,
483        projection: Option<Vec<usize>>,
484        filters: Vec<Expr>,
485    ) -> Result<Self> {
486        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
487    }
488
489    /// Convert a table provider into a builder with a TableScan with filter and fetch
490    pub fn scan_with_filters_fetch(
491        table_name: impl Into<TableReference>,
492        table_source: Arc<dyn TableSource>,
493        projection: Option<Vec<usize>>,
494        filters: Vec<Expr>,
495        fetch: Option<usize>,
496    ) -> Result<Self> {
497        Self::scan_with_filters_inner(
498            table_name,
499            table_source,
500            projection,
501            filters,
502            fetch,
503        )
504    }
505
506    fn scan_with_filters_inner(
507        table_name: impl Into<TableReference>,
508        table_source: Arc<dyn TableSource>,
509        projection: Option<Vec<usize>>,
510        filters: Vec<Expr>,
511        fetch: Option<usize>,
512    ) -> Result<Self> {
513        let table_scan =
514            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
515
516        // Inline TableScan
517        if table_scan.filters.is_empty()
518            && let Some(p) = table_scan.source.get_logical_plan()
519        {
520            let sub_plan = p.into_owned();
521
522            if let Some(proj) = table_scan.projection {
523                let projection_exprs = proj
524                    .into_iter()
525                    .map(|i| {
526                        Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
527                    })
528                    .collect::<Vec<_>>();
529                return Self::new(sub_plan)
530                    .project(projection_exprs)?
531                    .alias(table_scan.table_name);
532            }
533
534            // Ensures that the reference to the inlined table remains the
535            // same, meaning we don't have to change any of the parent nodes
536            // that reference this table.
537            return Self::new(sub_plan).alias(table_scan.table_name);
538        }
539
540        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
541    }
542
543    /// Wrap a plan in a window
544    pub fn window_plan(
545        input: LogicalPlan,
546        window_exprs: impl IntoIterator<Item = Expr>,
547    ) -> Result<LogicalPlan> {
548        let mut plan = input;
549        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
550        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
551        // we compare the sort key themselves and if one window's sort keys are a prefix of another
552        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
553        // The sort_by() implementation here is a stable sort.
554        // Note that by this rule if there's an empty over, it'll be at the top level
555        groups.sort_by(|(key_a, _), (key_b, _)| {
556            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
557                let key_ordering = compare_sort_expr(first, second, plan.schema());
558                match key_ordering {
559                    Ordering::Less => {
560                        return Ordering::Less;
561                    }
562                    Ordering::Greater => {
563                        return Ordering::Greater;
564                    }
565                    Ordering::Equal => {}
566                }
567            }
568            key_b.len().cmp(&key_a.len())
569        });
570        for (_, exprs) in groups {
571            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
572            // Partition and sorting is done at physical level, see the EnforceDistribution
573            // and EnforceSorting rules.
574            plan = LogicalPlanBuilder::from(plan)
575                .window(window_exprs)?
576                .build()?;
577        }
578        Ok(plan)
579    }
580
581    /// Apply a projection without alias.
582    pub fn project(
583        self,
584        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
585    ) -> Result<Self> {
586        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
587    }
588
589    /// Apply a projection without alias with optional validation
590    /// (true to validate, false to not validate)
591    pub fn project_with_validation(
592        self,
593        expr: Vec<(impl Into<SelectExpr>, bool)>,
594    ) -> Result<Self> {
595        project_with_validation(Arc::unwrap_or_clone(self.plan), expr, None)
596            .map(Self::new)
597    }
598
599    /// Apply a projection, aliasing non-Column/non-Alias expressions to
600    /// match the field names from the provided schema.
601    pub fn project_with_validation_and_schema(
602        self,
603        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
604        schema: &DFSchemaRef,
605    ) -> Result<Self> {
606        project_with_validation(
607            Arc::unwrap_or_clone(self.plan),
608            expr.into_iter().map(|e| (e, true)),
609            Some(schema),
610        )
611        .map(Self::new)
612    }
613
614    /// Select the given column indices
615    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
616        let exprs: Vec<_> = indices
617            .into_iter()
618            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
619            .collect();
620        self.project(exprs)
621    }
622
623    /// Apply a filter
624    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
625        let expr = normalize_col(expr.into(), &self.plan)?;
626        Filter::try_new(expr, self.plan)
627            .map(LogicalPlan::Filter)
628            .map(Self::new)
629    }
630
631    /// Apply a filter which is used for a having clause
632    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
633        let expr = normalize_col(expr.into(), &self.plan)?;
634        Filter::try_new(expr, self.plan)
635            .map(LogicalPlan::Filter)
636            .map(Self::from)
637    }
638
639    /// Make a builder for a prepare logical plan from the builder's plan
640    pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
641        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
642            Prepare {
643                name,
644                fields,
645                input: self.plan,
646            },
647        ))))
648    }
649
650    /// Limit the number of rows returned
651    ///
652    /// `skip` - Number of rows to skip before fetch any row.
653    ///
654    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
655    ///          if specified.
656    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
657        let skip_expr = if skip == 0 {
658            None
659        } else {
660            Some(lit(skip as i64))
661        };
662        let fetch_expr = fetch.map(|f| lit(f as i64));
663        self.limit_by_expr(skip_expr, fetch_expr)
664    }
665
666    /// Limit the number of rows returned
667    ///
668    /// Similar to `limit` but uses expressions for `skip` and `fetch`
669    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
670        Ok(Self::new(LogicalPlan::Limit(Limit {
671            skip: skip.map(Box::new),
672            fetch: fetch.map(Box::new),
673            input: self.plan,
674        })))
675    }
676
677    /// Apply an alias
678    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
679        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
680    }
681
682    /// Add missing sort columns to all downstream projection
683    ///
684    /// Thus, if you have a LogicalPlan that selects A and B and have
685    /// not requested a sort by C, this code will add C recursively to
686    /// all input projections.
687    ///
688    /// Adding a new column is not correct if there is a `Distinct`
689    /// node, which produces only distinct values of its
690    /// inputs. Adding a new column to its input will result in
691    /// potentially different results than with the original column.
692    ///
693    /// For example, if the input is like:
694    ///
695    /// Distinct(A, B)
696    ///
697    /// If the input looks like
698    ///
699    /// a | b | c
700    /// --+---+---
701    /// 1 | 2 | 3
702    /// 1 | 2 | 4
703    ///
704    /// Distinct (A, B) --> (1,2)
705    ///
706    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
707    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
708    ///
709    /// See <https://github.com/apache/datafusion/issues/5065> for more details
710    fn add_missing_columns(
711        curr_plan: LogicalPlan,
712        missing_cols: &IndexSet<Column>,
713        is_distinct: bool,
714    ) -> Result<LogicalPlan> {
715        match curr_plan {
716            LogicalPlan::Projection(Projection {
717                input,
718                mut expr,
719                schema: _,
720            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
721                let mut missing_exprs = missing_cols
722                    .iter()
723                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
724                    .collect::<Result<Vec<_>>>()?;
725
726                // Do not let duplicate columns to be added, some of the
727                // missing_cols may be already present but without the new
728                // projected alias.
729                missing_exprs.retain(|e| !expr.contains(e));
730                if is_distinct {
731                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
732                }
733                expr.extend(missing_exprs);
734                project(Arc::unwrap_or_clone(input), expr)
735            }
736            _ => {
737                let is_distinct =
738                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
739                let new_inputs = curr_plan
740                    .inputs()
741                    .into_iter()
742                    .map(|input_plan| {
743                        Self::add_missing_columns(
744                            (*input_plan).clone(),
745                            missing_cols,
746                            is_distinct,
747                        )
748                    })
749                    .collect::<Result<Vec<_>>>()?;
750                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
751            }
752        }
753    }
754
755    fn ambiguous_distinct_check(
756        missing_exprs: &[Expr],
757        missing_cols: &IndexSet<Column>,
758        projection_exprs: &[Expr],
759    ) -> Result<()> {
760        if missing_exprs.is_empty() {
761            return Ok(());
762        }
763
764        // if the missing columns are all only aliases for things in
765        // the existing select list, it is ok
766        //
767        // This handles the special case for
768        // SELECT col as <alias> ORDER BY <alias>
769        //
770        // As described in https://github.com/apache/datafusion/issues/5293
771        let all_aliases = missing_exprs.iter().all(|e| {
772            projection_exprs.iter().any(|proj_expr| {
773                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
774                    e == expr.as_ref()
775                } else {
776                    false
777                }
778            })
779        });
780        if all_aliases {
781            return Ok(());
782        }
783
784        let missing_col_names = missing_cols
785            .iter()
786            .map(|col| col.flat_name())
787            .collect::<String>();
788
789        plan_err!(
790            "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
791        )
792    }
793
794    /// Apply a sort by provided expressions with default direction
795    pub fn sort_by(
796        self,
797        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
798    ) -> Result<Self> {
799        self.sort(
800            expr.into_iter()
801                .map(|e| e.into().sort(true, false))
802                .collect::<Vec<SortExpr>>(),
803        )
804    }
805
806    pub fn sort(
807        self,
808        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
809    ) -> Result<Self> {
810        self.sort_with_limit(sorts, None)
811    }
812
813    /// Apply a sort
814    pub fn sort_with_limit(
815        self,
816        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
817        fetch: Option<usize>,
818    ) -> Result<Self> {
819        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
820
821        let schema = self.plan.schema();
822
823        // Collect sort columns that are missing in the input plan's schema
824        let mut missing_cols: IndexSet<Column> = IndexSet::new();
825        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
826            let columns = sort.expr.column_refs();
827
828            missing_cols.extend(
829                columns
830                    .into_iter()
831                    .filter(|c| !schema.has_column(c))
832                    .cloned(),
833            );
834
835            Ok(())
836        })?;
837
838        if missing_cols.is_empty() {
839            return Ok(Self::new(LogicalPlan::Sort(Sort {
840                expr: normalize_sorts(sorts, &self.plan)?,
841                input: self.plan,
842                fetch,
843            })));
844        }
845
846        // remove pushed down sort columns
847        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
848
849        let is_distinct = false;
850        let plan = Self::add_missing_columns(
851            Arc::unwrap_or_clone(self.plan),
852            &missing_cols,
853            is_distinct,
854        )?;
855
856        let sort_plan = LogicalPlan::Sort(Sort {
857            expr: normalize_sorts(sorts, &plan)?,
858            input: Arc::new(plan),
859            fetch,
860        });
861
862        Projection::try_new(new_expr, Arc::new(sort_plan))
863            .map(LogicalPlan::Projection)
864            .map(Self::new)
865    }
866
867    /// Apply a union, preserving duplicate rows
868    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
869        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
870    }
871
872    /// Apply a union by name, preserving duplicate rows
873    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
874        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
875    }
876
877    /// Apply a union by name, removing duplicate rows
878    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
879        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
880        let right_plan: LogicalPlan = plan;
881
882        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
883            union_by_name(left_plan, right_plan)?,
884        )))))
885    }
886
887    /// Apply a union, removing duplicate rows
888    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
889        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
890        let right_plan: LogicalPlan = plan;
891
892        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
893            union(left_plan, right_plan)?,
894        )))))
895    }
896
897    /// Apply deduplication: Only distinct (different) values are returned)
898    pub fn distinct(self) -> Result<Self> {
899        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
900    }
901
902    /// Project first values of the specified expression list according to the provided
903    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
904    pub fn distinct_on(
905        self,
906        on_expr: Vec<Expr>,
907        select_expr: Vec<Expr>,
908        sort_expr: Option<Vec<SortExpr>>,
909    ) -> Result<Self> {
910        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
911            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
912        ))))
913    }
914
915    /// Apply a join to `right` using explicitly specified columns and an
916    /// optional filter expression.
917    ///
918    /// See [`join_on`](Self::join_on) for a more concise way to specify the
919    /// join condition. Since DataFusion will automatically identify and
920    /// optimize equality predicates there is no performance difference between
921    /// this function and `join_on`
922    ///
923    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
924    /// example below), which are then combined with the optional `filter`
925    /// expression.
926    ///
927    /// Note that in case of outer join, the `filter` is applied to only matched rows.
928    pub fn join(
929        self,
930        right: LogicalPlan,
931        join_type: JoinType,
932        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
933        filter: Option<Expr>,
934    ) -> Result<Self> {
935        self.join_detailed(
936            right,
937            join_type,
938            join_keys,
939            filter,
940            NullEquality::NullEqualsNothing,
941        )
942    }
943
944    /// Apply a join using the specified expressions.
945    ///
946    /// Note that DataFusion automatically optimizes joins, including
947    /// identifying and optimizing equality predicates.
948    ///
949    /// # Example
950    ///
951    /// ```
952    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
953    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
954    /// # use std::sync::Arc;
955    /// # use arrow::datatypes::{Schema, DataType, Field};
956    /// # use datafusion_common::Result;
957    /// # fn main() -> Result<()> {
958    /// let example_schema = Arc::new(Schema::new(vec![
959    ///     Field::new("a", DataType::Int32, false),
960    ///     Field::new("b", DataType::Int32, false),
961    ///     Field::new("c", DataType::Int32, false),
962    /// ]));
963    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
964    /// let left_table = table_source.clone();
965    /// let right_table = table_source.clone();
966    ///
967    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
968    ///
969    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
970    /// let exprs = vec![
971    ///     col("left.a").eq(col("right.a")),
972    ///     col("left.b").not_eq(col("right.b")),
973    /// ];
974    ///
975    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
976    /// // finding all pairs of rows from `left` and `right` where
977    /// // where `a = a2` and `b != b2`.
978    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
979    ///     .join_on(right_plan, JoinType::Inner, exprs)?
980    ///     .build()?;
981    /// # Ok(())
982    /// # }
983    /// ```
984    pub fn join_on(
985        self,
986        right: LogicalPlan,
987        join_type: JoinType,
988        on_exprs: impl IntoIterator<Item = Expr>,
989    ) -> Result<Self> {
990        let filter = on_exprs.into_iter().reduce(Expr::and);
991
992        self.join_detailed(
993            right,
994            join_type,
995            (Vec::<Column>::new(), Vec::<Column>::new()),
996            filter,
997            NullEquality::NullEqualsNothing,
998        )
999    }
1000
1001    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
1002        if column.relation.is_some() {
1003            // column is already normalized
1004            return Ok(column);
1005        }
1006
1007        let schema = plan.schema();
1008        let fallback_schemas = plan.fallback_normalize_schemas();
1009        let using_columns = plan.using_columns()?;
1010        column.normalize_with_schemas_and_ambiguity_check(
1011            &[&[schema], &fallback_schemas],
1012            &using_columns,
1013        )
1014    }
1015
1016    /// Apply a join with on constraint and specified null equality.
1017    ///
1018    /// The behavior is the same as [`join`](Self::join) except that it allows
1019    /// specifying the null equality behavior.
1020    ///
1021    /// The `null_equality` dictates how `null` values are joined.
1022    pub fn join_detailed(
1023        self,
1024        right: LogicalPlan,
1025        join_type: JoinType,
1026        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1027        filter: Option<Expr>,
1028        null_equality: NullEquality,
1029    ) -> Result<Self> {
1030        self.join_detailed_with_options(
1031            right,
1032            join_type,
1033            join_keys,
1034            filter,
1035            null_equality,
1036            false,
1037        )
1038    }
1039
1040    pub fn join_detailed_with_options(
1041        self,
1042        right: LogicalPlan,
1043        join_type: JoinType,
1044        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1045        filter: Option<Expr>,
1046        null_equality: NullEquality,
1047        null_aware: bool,
1048    ) -> Result<Self> {
1049        if join_keys.0.len() != join_keys.1.len() {
1050            return plan_err!("left_keys and right_keys were not the same length");
1051        }
1052
1053        let filter = if let Some(expr) = filter {
1054            let filter = normalize_col_with_schemas_and_ambiguity_check(
1055                expr,
1056                &[&[self.schema(), right.schema()]],
1057                &[],
1058            )?;
1059            Some(filter)
1060        } else {
1061            None
1062        };
1063
1064        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1065            join_keys
1066                .0
1067                .into_iter()
1068                .zip(join_keys.1)
1069                .map(|(l, r)| {
1070                    let l = l.into();
1071                    let r = r.into();
1072
1073                    match (&l.relation, &r.relation) {
1074                        (Some(lr), Some(rr)) => {
1075                            let l_is_left =
1076                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1077                            let l_is_right =
1078                                right.schema().field_with_qualified_name(lr, &l.name);
1079                            let r_is_left =
1080                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1081                            let r_is_right =
1082                                right.schema().field_with_qualified_name(rr, &r.name);
1083
1084                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1085                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1086                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1087                                _ => (
1088                                    Self::normalize(&self.plan, l),
1089                                    Self::normalize(&right, r),
1090                                ),
1091                            }
1092                        }
1093                        (Some(lr), None) => {
1094                            let l_is_left =
1095                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1096                            let l_is_right =
1097                                right.schema().field_with_qualified_name(lr, &l.name);
1098
1099                            match (l_is_left, l_is_right) {
1100                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1101                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1102                                _ => (
1103                                    Self::normalize(&self.plan, l),
1104                                    Self::normalize(&right, r),
1105                                ),
1106                            }
1107                        }
1108                        (None, Some(rr)) => {
1109                            let r_is_left =
1110                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1111                            let r_is_right =
1112                                right.schema().field_with_qualified_name(rr, &r.name);
1113
1114                            match (r_is_left, r_is_right) {
1115                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1116                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1117                                _ => (
1118                                    Self::normalize(&self.plan, l),
1119                                    Self::normalize(&right, r),
1120                                ),
1121                            }
1122                        }
1123                        (None, None) => {
1124                            let mut swap = false;
1125                            let left_key = Self::normalize(&self.plan, l.clone())
1126                                .or_else(|_| {
1127                                    swap = true;
1128                                    Self::normalize(&right, l)
1129                                });
1130                            if swap {
1131                                (Self::normalize(&self.plan, r), left_key)
1132                            } else {
1133                                (left_key, Self::normalize(&right, r))
1134                            }
1135                        }
1136                    }
1137                })
1138                .unzip();
1139
1140        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1141        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1142
1143        let on: Vec<_> = left_keys
1144            .into_iter()
1145            .zip(right_keys)
1146            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1147            .collect();
1148        let join_schema =
1149            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1150
1151        // Inner type without join condition is cross join
1152        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1153            return plan_err!("join condition should not be empty");
1154        }
1155
1156        Ok(Self::new(LogicalPlan::Join(Join {
1157            left: self.plan,
1158            right: Arc::new(right),
1159            on,
1160            filter,
1161            join_type,
1162            join_constraint: JoinConstraint::On,
1163            schema: DFSchemaRef::new(join_schema),
1164            null_equality,
1165            null_aware,
1166        })))
1167    }
1168
1169    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1170    pub fn join_using(
1171        self,
1172        right: LogicalPlan,
1173        join_type: JoinType,
1174        using_keys: Vec<Column>,
1175    ) -> Result<Self> {
1176        let left_keys: Vec<Column> = using_keys
1177            .clone()
1178            .into_iter()
1179            .map(|c| Self::normalize(&self.plan, c))
1180            .collect::<Result<_>>()?;
1181        let right_keys: Vec<Column> = using_keys
1182            .into_iter()
1183            .map(|c| Self::normalize(&right, c))
1184            .collect::<Result<_>>()?;
1185
1186        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1187        let mut join_on: Vec<(Expr, Expr)> = vec![];
1188        let mut filters: Option<Expr> = None;
1189        for (l, r) in &on {
1190            if self.plan.schema().has_column(l)
1191                && right.schema().has_column(r)
1192                && can_hash(
1193                    datafusion_common::ExprSchema::field_from_column(
1194                        self.plan.schema(),
1195                        l,
1196                    )?
1197                    .data_type(),
1198                )
1199            {
1200                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1201            } else if self.plan.schema().has_column(l)
1202                && right.schema().has_column(r)
1203                && can_hash(
1204                    datafusion_common::ExprSchema::field_from_column(
1205                        self.plan.schema(),
1206                        r,
1207                    )?
1208                    .data_type(),
1209                )
1210            {
1211                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1212            } else {
1213                let expr = binary_expr(
1214                    Expr::Column(l.clone()),
1215                    Operator::Eq,
1216                    Expr::Column(r.clone()),
1217                );
1218                match filters {
1219                    None => filters = Some(expr),
1220                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1221                }
1222            }
1223        }
1224
1225        if join_on.is_empty() {
1226            let join = Self::from(self.plan).cross_join(right)?;
1227            join.filter(filters.ok_or_else(|| {
1228                internal_datafusion_err!("filters should not be None here")
1229            })?)
1230        } else {
1231            let join = Join::try_new(
1232                self.plan,
1233                Arc::new(right),
1234                join_on,
1235                filters,
1236                join_type,
1237                JoinConstraint::Using,
1238                NullEquality::NullEqualsNothing,
1239                false, // null_aware
1240            )?;
1241
1242            Ok(Self::new(LogicalPlan::Join(join)))
1243        }
1244    }
1245
1246    /// Apply a cross join
1247    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1248        let join = Join::try_new(
1249            self.plan,
1250            Arc::new(right),
1251            vec![],
1252            None,
1253            JoinType::Inner,
1254            JoinConstraint::On,
1255            NullEquality::NullEqualsNothing,
1256            false, // null_aware
1257        )?;
1258
1259        Ok(Self::new(LogicalPlan::Join(join)))
1260    }
1261
1262    /// Repartition
1263    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1264        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1265            input: self.plan,
1266            partitioning_scheme,
1267        })))
1268    }
1269
1270    /// Apply a window functions to extend the schema
1271    pub fn window(
1272        self,
1273        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1274    ) -> Result<Self> {
1275        let window_expr = normalize_cols(window_expr, &self.plan)?;
1276        validate_unique_names("Windows", &window_expr)?;
1277        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1278            window_expr,
1279            self.plan,
1280        )?)))
1281    }
1282
1283    /// Apply an aggregate: grouping on the `group_expr` expressions
1284    /// and calculating `aggr_expr` aggregates for each distinct
1285    /// value of the `group_expr`;
1286    pub fn aggregate(
1287        self,
1288        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1289        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1290    ) -> Result<Self> {
1291        let group_expr = normalize_cols(group_expr, &self.plan)?;
1292        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1293
1294        let group_expr = if self.options.add_implicit_group_by_exprs {
1295            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1296        } else {
1297            group_expr
1298        };
1299
1300        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1301            .map(LogicalPlan::Aggregate)
1302            .map(Self::new)
1303    }
1304
1305    /// Create an expression to represent the explanation of the plan
1306    ///
1307    /// if `analyze` is true, runs the actual plan and produces
1308    /// information about metrics during run.
1309    ///
1310    /// if `verbose` is true, prints out additional details.
1311    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1312        // Keep the format default to Indent
1313        self.explain_option_format(
1314            ExplainOption::default()
1315                .with_verbose(verbose)
1316                .with_analyze(analyze),
1317        )
1318    }
1319
1320    /// Create an expression to represent the explanation of the plan
1321    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1322    /// Details see [`ExplainOption`].
1323    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1324        let schema = LogicalPlan::explain_schema();
1325        let schema = schema.to_dfschema_ref()?;
1326
1327        if explain_option.analyze {
1328            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1329                verbose: explain_option.verbose,
1330                input: self.plan,
1331                schema,
1332            })))
1333        } else {
1334            let stringified_plans =
1335                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1336
1337            Ok(Self::new(LogicalPlan::Explain(Explain {
1338                verbose: explain_option.verbose,
1339                plan: self.plan,
1340                explain_format: explain_option.format,
1341                stringified_plans,
1342                schema,
1343                logical_optimization_succeeded: false,
1344            })))
1345        }
1346    }
1347
1348    /// Process intersect set operator
1349    pub fn intersect(
1350        left_plan: LogicalPlan,
1351        right_plan: LogicalPlan,
1352        is_all: bool,
1353    ) -> Result<LogicalPlan> {
1354        LogicalPlanBuilder::intersect_or_except(
1355            left_plan,
1356            right_plan,
1357            JoinType::LeftSemi,
1358            is_all,
1359        )
1360    }
1361
1362    /// Process except set operator
1363    pub fn except(
1364        left_plan: LogicalPlan,
1365        right_plan: LogicalPlan,
1366        is_all: bool,
1367    ) -> Result<LogicalPlan> {
1368        LogicalPlanBuilder::intersect_or_except(
1369            left_plan,
1370            right_plan,
1371            JoinType::LeftAnti,
1372            is_all,
1373        )
1374    }
1375
1376    /// Process intersect or except
1377    fn intersect_or_except(
1378        left_plan: LogicalPlan,
1379        right_plan: LogicalPlan,
1380        join_type: JoinType,
1381        is_all: bool,
1382    ) -> Result<LogicalPlan> {
1383        let left_len = left_plan.schema().fields().len();
1384        let right_len = right_plan.schema().fields().len();
1385
1386        if left_len != right_len {
1387            return plan_err!(
1388                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1389            );
1390        }
1391
1392        // Requalify sides if needed to avoid duplicate qualified field names
1393        // (e.g., when both sides reference the same table)
1394        let left_builder = LogicalPlanBuilder::from(left_plan);
1395        let right_builder = LogicalPlanBuilder::from(right_plan);
1396        let (left_builder, right_builder, _requalified) =
1397            requalify_sides_if_needed(left_builder, right_builder)?;
1398        let left_plan = left_builder.build()?;
1399        let right_plan = right_builder.build()?;
1400
1401        let join_keys = left_plan
1402            .schema()
1403            .fields()
1404            .iter()
1405            .zip(right_plan.schema().fields().iter())
1406            .map(|(left_field, right_field)| {
1407                (
1408                    (Column::from_name(left_field.name())),
1409                    (Column::from_name(right_field.name())),
1410                )
1411            })
1412            .unzip();
1413        if is_all {
1414            LogicalPlanBuilder::from(left_plan)
1415                .join_detailed(
1416                    right_plan,
1417                    join_type,
1418                    join_keys,
1419                    None,
1420                    NullEquality::NullEqualsNull,
1421                )?
1422                .build()
1423        } else {
1424            LogicalPlanBuilder::from(left_plan)
1425                .distinct()?
1426                .join_detailed(
1427                    right_plan,
1428                    join_type,
1429                    join_keys,
1430                    None,
1431                    NullEquality::NullEqualsNull,
1432                )?
1433                .build()
1434        }
1435    }
1436
1437    /// Build the plan
1438    pub fn build(self) -> Result<LogicalPlan> {
1439        Ok(Arc::unwrap_or_clone(self.plan))
1440    }
1441
1442    /// Apply a join with both explicit equijoin and non equijoin predicates.
1443    ///
1444    /// Note this is a low level API that requires identifying specific
1445    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1446    /// automatically identifies predicates appropriately.
1447    ///
1448    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1449    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1450    /// to columns from the existing input. `r`, the second element of the tuple,
1451    /// must only refer to columns from the right input.
1452    ///
1453    /// `filter` contains any other filter expression to apply during the
1454    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1455    /// than the filter expressions, so they are preferred.
1456    pub fn join_with_expr_keys(
1457        self,
1458        right: LogicalPlan,
1459        join_type: JoinType,
1460        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1461        filter: Option<Expr>,
1462    ) -> Result<Self> {
1463        if equi_exprs.0.len() != equi_exprs.1.len() {
1464            return plan_err!("left_keys and right_keys were not the same length");
1465        }
1466
1467        let join_key_pairs = equi_exprs
1468            .0
1469            .into_iter()
1470            .zip(equi_exprs.1)
1471            .map(|(l, r)| {
1472                let left_key = l.into();
1473                let right_key = r.into();
1474                let mut left_using_columns  = HashSet::new();
1475                expr_to_columns(&left_key, &mut left_using_columns)?;
1476                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1477                    left_key,
1478                    &[&[self.plan.schema()]],
1479                    &[],
1480                )?;
1481
1482                let mut right_using_columns = HashSet::new();
1483                expr_to_columns(&right_key, &mut right_using_columns)?;
1484                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1485                    right_key,
1486                    &[&[right.schema()]],
1487                    &[],
1488                )?;
1489
1490                // find valid equijoin
1491                find_valid_equijoin_key_pair(
1492                        &normalized_left_key,
1493                        &normalized_right_key,
1494                        self.plan.schema(),
1495                        right.schema(),
1496                    )?.ok_or_else(||
1497                        plan_datafusion_err!(
1498                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1499                        ))
1500            })
1501            .collect::<Result<Vec<_>>>()?;
1502
1503        let join = Join::try_new(
1504            self.plan,
1505            Arc::new(right),
1506            join_key_pairs,
1507            filter,
1508            join_type,
1509            JoinConstraint::On,
1510            NullEquality::NullEqualsNothing,
1511            false, // null_aware
1512        )?;
1513
1514        Ok(Self::new(LogicalPlan::Join(join)))
1515    }
1516
1517    /// Unnest the given column.
1518    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1519        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1520    }
1521
1522    /// Unnest the given column given [`UnnestOptions`]
1523    pub fn unnest_column_with_options(
1524        self,
1525        column: impl Into<Column>,
1526        options: UnnestOptions,
1527    ) -> Result<Self> {
1528        unnest_with_options(
1529            Arc::unwrap_or_clone(self.plan),
1530            vec![column.into()],
1531            options,
1532        )
1533        .map(Self::new)
1534    }
1535
1536    /// Unnest the given columns with the given [`UnnestOptions`]
1537    pub fn unnest_columns_with_options(
1538        self,
1539        columns: Vec<Column>,
1540        options: UnnestOptions,
1541    ) -> Result<Self> {
1542        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1543            .map(Self::new)
1544    }
1545}
1546
1547impl From<LogicalPlan> for LogicalPlanBuilder {
1548    fn from(plan: LogicalPlan) -> Self {
1549        LogicalPlanBuilder::new(plan)
1550    }
1551}
1552
1553impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1554    fn from(plan: Arc<LogicalPlan>) -> Self {
1555        LogicalPlanBuilder::new_from_arc(plan)
1556    }
1557}
1558
1559/// Container used when building fields for a `VALUES` node.
1560#[derive(Default)]
1561struct ValuesFields {
1562    inner: Vec<Field>,
1563}
1564
1565impl ValuesFields {
1566    pub fn new() -> Self {
1567        Self::default()
1568    }
1569
1570    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1571        self.push_with_metadata(data_type, nullable, None);
1572    }
1573
1574    pub fn push_with_metadata(
1575        &mut self,
1576        data_type: DataType,
1577        nullable: bool,
1578        metadata: Option<FieldMetadata>,
1579    ) {
1580        // Naming follows the convention described here:
1581        // https://www.postgresql.org/docs/current/queries-values.html
1582        let name = format!("column{}", self.inner.len() + 1);
1583        let mut field = Field::new(name, data_type, nullable);
1584        if let Some(metadata) = metadata {
1585            field.set_metadata(metadata.to_hashmap());
1586        }
1587        self.inner.push(field);
1588    }
1589
1590    pub fn into_fields(self) -> Fields {
1591        self.inner.into()
1592    }
1593}
1594
1595/// Returns aliases to make field names unique.
1596///
1597/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1598/// `Some(alias)` means rename to the alias to ensure uniqueness.
1599///
1600/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1601/// to maintain unique column names.
1602///
1603/// # Example
1604/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1605/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1606pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1607    // Some field names might already come to this function with the count (number of times it appeared)
1608    // as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1609    // if these three fields passed to this function: "col:1", "col" and "col", the function
1610    // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1611    // That's why we need the `seen` set, so the fields are always unique.
1612
1613    // Tracks a mapping between a field name and the number of appearances of that field.
1614    let mut name_map = HashMap::<&str, usize>::new();
1615    // Tracks all the fields and aliases that were previously seen.
1616    let mut seen = HashSet::<Cow<String>>::new();
1617
1618    fields
1619        .iter()
1620        .map(|field| {
1621            let original_name = field.name();
1622            let mut name = Cow::Borrowed(original_name);
1623
1624            let count = name_map.entry(original_name).or_insert(0);
1625
1626            // Loop until we find a name that hasn't been used.
1627            while seen.contains(&name) {
1628                *count += 1;
1629                name = Cow::Owned(format!("{original_name}:{count}"));
1630            }
1631
1632            seen.insert(name.clone());
1633
1634            match name {
1635                Cow::Borrowed(_) => None,
1636                Cow::Owned(alias) => Some(alias),
1637            }
1638        })
1639        .collect()
1640}
1641
1642fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1643    let mut table_references = schema
1644        .iter()
1645        .filter_map(|(qualifier, _)| qualifier)
1646        .collect::<Vec<_>>();
1647    table_references.dedup();
1648    let table_reference = if table_references.len() == 1 {
1649        table_references.pop().cloned()
1650    } else {
1651        None
1652    };
1653
1654    (
1655        table_reference,
1656        Arc::new(Field::new("mark", DataType::Boolean, false)),
1657    )
1658}
1659
1660/// Creates a schema for a join operation.
1661/// The fields from the left side are first
1662pub fn build_join_schema(
1663    left: &DFSchema,
1664    right: &DFSchema,
1665    join_type: &JoinType,
1666) -> Result<DFSchema> {
1667    fn nullify_fields<'a>(
1668        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1669    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1670        fields
1671            .map(|(q, f)| {
1672                // TODO: find a good way to do that
1673                let field = f.as_ref().clone().with_nullable(true);
1674                (q.cloned(), Arc::new(field))
1675            })
1676            .collect()
1677    }
1678
1679    let right_fields = right.iter();
1680    let left_fields = left.iter();
1681
1682    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1683        JoinType::Inner => {
1684            // left then right
1685            let left_fields = left_fields
1686                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1687                .collect::<Vec<_>>();
1688            let right_fields = right_fields
1689                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1690                .collect::<Vec<_>>();
1691            left_fields.into_iter().chain(right_fields).collect()
1692        }
1693        JoinType::Left => {
1694            // left then right, right set to nullable in case of not matched scenario
1695            let left_fields = left_fields
1696                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1697                .collect::<Vec<_>>();
1698            left_fields
1699                .into_iter()
1700                .chain(nullify_fields(right_fields))
1701                .collect()
1702        }
1703        JoinType::Right => {
1704            // left then right, left set to nullable in case of not matched scenario
1705            let right_fields = right_fields
1706                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1707                .collect::<Vec<_>>();
1708            nullify_fields(left_fields)
1709                .into_iter()
1710                .chain(right_fields)
1711                .collect()
1712        }
1713        JoinType::Full => {
1714            // left then right, all set to nullable in case of not matched scenario
1715            nullify_fields(left_fields)
1716                .into_iter()
1717                .chain(nullify_fields(right_fields))
1718                .collect()
1719        }
1720        JoinType::LeftSemi | JoinType::LeftAnti => {
1721            // Only use the left side for the schema
1722            left_fields
1723                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1724                .collect()
1725        }
1726        JoinType::LeftMark => left_fields
1727            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1728            .chain(once(mark_field(right)))
1729            .collect(),
1730        JoinType::RightSemi | JoinType::RightAnti => {
1731            // Only use the right side for the schema
1732            right_fields
1733                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1734                .collect()
1735        }
1736        JoinType::RightMark => right_fields
1737            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1738            .chain(once(mark_field(left)))
1739            .collect(),
1740    };
1741    let func_dependencies = left.functional_dependencies().join(
1742        right.functional_dependencies(),
1743        join_type,
1744        left.fields().len(),
1745    );
1746
1747    let (schema1, schema2) = match join_type {
1748        JoinType::Right
1749        | JoinType::RightSemi
1750        | JoinType::RightAnti
1751        | JoinType::RightMark => (left, right),
1752        _ => (right, left),
1753    };
1754
1755    let metadata = schema1
1756        .metadata()
1757        .clone()
1758        .into_iter()
1759        .chain(schema2.metadata().clone())
1760        .collect();
1761
1762    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1763    dfschema.with_functional_dependencies(func_dependencies)
1764}
1765
1766/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1767/// conflict with the columns from the other.
1768/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1769/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1770/// places (see e.g. DFSchema::check_names).
1771/// The function returns:
1772/// - The requalified or original left logical plan
1773/// - The requalified or original right logical plan
1774/// - If a requalification was needed or not
1775pub fn requalify_sides_if_needed(
1776    left: LogicalPlanBuilder,
1777    right: LogicalPlanBuilder,
1778) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1779    let left_cols = left.schema().columns();
1780    let right_cols = right.schema().columns();
1781
1782    // Requalify if merging the schemas would cause an error during join.
1783    // This can happen in several cases:
1784    // 1. Duplicate qualified fields: both sides have same relation.name
1785    // 2. Duplicate unqualified fields: both sides have same unqualified name
1786    // 3. Ambiguous reference: one side qualified, other unqualified, same name
1787    //
1788    // Implementation note: This uses a simple O(n*m) nested loop rather than
1789    // a HashMap-based O(n+m) approach. The nested loop is preferred because:
1790    // - Schemas are typically small (in TPCH benchmark, max is 16 columns),
1791    //   so n*m is negligible
1792    // - Early return on first conflict makes common case very fast
1793    // - Code is simpler and easier to reason about
1794    // - Called only during plan construction, not in execution hot path
1795    for l in &left_cols {
1796        for r in &right_cols {
1797            if l.name != r.name {
1798                continue;
1799            }
1800
1801            // Same name - check if this would cause a conflict
1802            match (&l.relation, &r.relation) {
1803                // Both qualified with same relation - duplicate qualified field
1804                (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1805                    return Ok((
1806                        left.alias(TableReference::bare("left"))?,
1807                        right.alias(TableReference::bare("right"))?,
1808                        true,
1809                    ));
1810                }
1811                // Both unqualified - duplicate unqualified field
1812                (None, None) => {
1813                    return Ok((
1814                        left.alias(TableReference::bare("left"))?,
1815                        right.alias(TableReference::bare("right"))?,
1816                        true,
1817                    ));
1818                }
1819                // One qualified, one not - ambiguous reference
1820                (Some(_), None) | (None, Some(_)) => {
1821                    return Ok((
1822                        left.alias(TableReference::bare("left"))?,
1823                        right.alias(TableReference::bare("right"))?,
1824                        true,
1825                    ));
1826                }
1827                // Different qualifiers - OK, no conflict
1828                _ => {}
1829            }
1830        }
1831    }
1832
1833    // No conflicts found
1834    Ok((left, right, false))
1835}
1836/// Add additional "synthetic" group by expressions based on functional
1837/// dependencies.
1838///
1839/// For example, if we are grouping on `[c1]`, and we know from
1840/// functional dependencies that column `c1` determines `c2`, this function
1841/// adds `c2` to the group by list.
1842///
1843/// This allows MySQL style selects like
1844/// `SELECT col FROM t WHERE pk = 5` if col is unique
1845pub fn add_group_by_exprs_from_dependencies(
1846    mut group_expr: Vec<Expr>,
1847    schema: &DFSchemaRef,
1848) -> Result<Vec<Expr>> {
1849    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1850    // c1 + 1` produces an output field named `"c1 + 1"`
1851    let mut group_by_field_names = group_expr
1852        .iter()
1853        .map(|e| e.schema_name().to_string())
1854        .collect::<Vec<_>>();
1855
1856    if let Some(target_indices) =
1857        get_target_functional_dependencies(schema, &group_by_field_names)
1858    {
1859        for idx in target_indices {
1860            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1861            let expr_name = expr.schema_name().to_string();
1862            if !group_by_field_names.contains(&expr_name) {
1863                group_by_field_names.push(expr_name);
1864                group_expr.push(expr);
1865            }
1866        }
1867    }
1868    Ok(group_expr)
1869}
1870
1871/// Errors if one or more expressions have equal names.
1872pub fn validate_unique_names<'a>(
1873    node_name: &str,
1874    expressions: impl IntoIterator<Item = &'a Expr>,
1875) -> Result<()> {
1876    let mut unique_names = HashMap::new();
1877
1878    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1879        let name = expr.schema_name().to_string();
1880        match unique_names.get(&name) {
1881            None => {
1882                unique_names.insert(name, (position, expr));
1883                Ok(())
1884            },
1885            Some((existing_position, existing_expr)) => {
1886                plan_err!("{node_name} require unique expression names \
1887                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1888                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1889                            )
1890            }
1891        }
1892    })
1893}
1894
1895/// Union two [`LogicalPlan`]s.
1896///
1897/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1898/// subtree expressions will not be properly typed until the optimizer pass.
1899///
1900/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1901/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1902/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1903///
1904/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1905/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1906pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1907    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1908        Arc::new(left_plan),
1909        Arc::new(right_plan),
1910    ])?))
1911}
1912
1913/// Like [`union`], but combine rows from different tables by name, rather than
1914/// by position.
1915pub fn union_by_name(
1916    left_plan: LogicalPlan,
1917    right_plan: LogicalPlan,
1918) -> Result<LogicalPlan> {
1919    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1920        Arc::new(left_plan),
1921        Arc::new(right_plan),
1922    ])?))
1923}
1924
1925/// Create Projection
1926/// # Errors
1927/// This function errors under any of the following conditions:
1928/// * Two or more expressions have the same name
1929/// * An invalid expression is used (e.g. a `sort` expression)
1930pub fn project(
1931    plan: LogicalPlan,
1932    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1933) -> Result<LogicalPlan> {
1934    project_with_validation(plan, expr.into_iter().map(|e| (e, true)), None)
1935}
1936
1937/// Create Projection. Similar to project except that the expressions
1938/// passed in have a flag to indicate if that expression requires
1939/// validation (normalize & columnize) (true) or not (false)
1940/// # Errors
1941/// This function errors under any of the following conditions:
1942/// * Two or more expressions have the same name
1943/// * An invalid expression is used (e.g. a `sort` expression)
1944fn project_with_validation(
1945    plan: LogicalPlan,
1946    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1947    schema: Option<&DFSchemaRef>,
1948) -> Result<LogicalPlan> {
1949    let mut projected_expr = vec![];
1950    let mut has_wildcard = false;
1951    for (e, validate) in expr {
1952        let e = e.into();
1953        match e {
1954            SelectExpr::Wildcard(opt) => {
1955                has_wildcard = true;
1956                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1957
1958                // If there is a REPLACE statement, replace that column with the given
1959                // replace expression. Column name remains the same.
1960                let expanded = if let Some(replace) = opt.replace {
1961                    replace_columns(expanded, &replace)?
1962                } else {
1963                    expanded
1964                };
1965
1966                for e in expanded {
1967                    if validate {
1968                        projected_expr
1969                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1970                    } else {
1971                        projected_expr.push(e)
1972                    }
1973                }
1974            }
1975            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1976                has_wildcard = true;
1977                let expanded =
1978                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1979
1980                // If there is a REPLACE statement, replace that column with the given
1981                // replace expression. Column name remains the same.
1982                let expanded = if let Some(replace) = opt.replace {
1983                    replace_columns(expanded, &replace)?
1984                } else {
1985                    expanded
1986                };
1987
1988                for e in expanded {
1989                    if validate {
1990                        projected_expr
1991                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1992                    } else {
1993                        projected_expr.push(e)
1994                    }
1995                }
1996            }
1997            SelectExpr::Expression(e) => {
1998                if validate {
1999                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
2000                } else {
2001                    projected_expr.push(e)
2002                }
2003            }
2004        }
2005    }
2006
2007    if has_wildcard && projected_expr.is_empty() && !plan.schema().fields().is_empty() {
2008        return plan_err!(
2009            "SELECT list is empty after resolving * expressions, \
2010             the wildcard expanded to zero columns"
2011        );
2012    }
2013
2014    // When inside a set expression, alias non-Column/non-Alias expressions
2015    // to match the left side's field names, avoiding duplicate name errors.
2016    if let Some(schema) = &schema {
2017        for (expr, field) in projected_expr.iter_mut().zip(schema.fields()) {
2018            if !matches!(expr, Expr::Column(_) | Expr::Alias(_)) {
2019                *expr = std::mem::take(expr).alias(field.name());
2020            }
2021        }
2022    }
2023
2024    validate_unique_names("Projections", projected_expr.iter())?;
2025
2026    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
2027}
2028
2029/// If there is a REPLACE statement in the projected expression in the form of
2030/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
2031/// that column with the given replace expression. Column name remains the same.
2032/// Multiple REPLACEs are also possible with comma separations.
2033fn replace_columns(
2034    mut exprs: Vec<Expr>,
2035    replace: &PlannedReplaceSelectItem,
2036) -> Result<Vec<Expr>> {
2037    for expr in exprs.iter_mut() {
2038        if let Expr::Column(Column { name, .. }) = expr
2039            && let Some((_, new_expr)) = replace
2040                .items()
2041                .iter()
2042                .zip(replace.expressions().iter())
2043                .find(|(item, _)| item.column_name.value == *name)
2044        {
2045            *expr = new_expr.clone().alias(name.clone())
2046        }
2047    }
2048    Ok(exprs)
2049}
2050
2051/// Create a SubqueryAlias to wrap a LogicalPlan.
2052pub fn subquery_alias(
2053    plan: LogicalPlan,
2054    alias: impl Into<TableReference>,
2055) -> Result<LogicalPlan> {
2056    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
2057}
2058
2059/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
2060/// This is mostly used for testing and documentation.
2061pub fn table_scan(
2062    name: Option<impl Into<TableReference>>,
2063    table_schema: &Schema,
2064    projection: Option<Vec<usize>>,
2065) -> Result<LogicalPlanBuilder> {
2066    table_scan_with_filters(name, table_schema, projection, vec![])
2067}
2068
2069/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2070/// and inlined filters.
2071/// This is mostly used for testing and documentation.
2072pub fn table_scan_with_filters(
2073    name: Option<impl Into<TableReference>>,
2074    table_schema: &Schema,
2075    projection: Option<Vec<usize>>,
2076    filters: Vec<Expr>,
2077) -> Result<LogicalPlanBuilder> {
2078    let table_source = table_source(table_schema);
2079    let name = name
2080        .map(|n| n.into())
2081        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2082    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2083}
2084
2085/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2086/// filters, and inlined fetch.
2087/// This is mostly used for testing and documentation.
2088pub fn table_scan_with_filter_and_fetch(
2089    name: Option<impl Into<TableReference>>,
2090    table_schema: &Schema,
2091    projection: Option<Vec<usize>>,
2092    filters: Vec<Expr>,
2093    fetch: Option<usize>,
2094) -> Result<LogicalPlanBuilder> {
2095    let table_source = table_source(table_schema);
2096    let name = name
2097        .map(|n| n.into())
2098        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2099    LogicalPlanBuilder::scan_with_filters_fetch(
2100        name,
2101        table_source,
2102        projection,
2103        filters,
2104        fetch,
2105    )
2106}
2107
2108pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2109    // TODO should we take SchemaRef and avoid cloning?
2110    let table_schema = Arc::new(table_schema.clone());
2111    Arc::new(LogicalTableSource {
2112        table_schema,
2113        constraints: Default::default(),
2114    })
2115}
2116
2117pub fn table_source_with_constraints(
2118    table_schema: &Schema,
2119    constraints: Constraints,
2120) -> Arc<dyn TableSource> {
2121    // TODO should we take SchemaRef and avoid cloning?
2122    let table_schema = Arc::new(table_schema.clone());
2123    Arc::new(LogicalTableSource {
2124        table_schema,
2125        constraints,
2126    })
2127}
2128
2129/// Wrap projection for a plan, if the join keys contains normal expression.
2130pub fn wrap_projection_for_join_if_necessary(
2131    join_keys: &[Expr],
2132    input: LogicalPlan,
2133) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2134    let input_schema = input.schema();
2135    let alias_join_keys: Vec<Expr> = join_keys
2136        .iter()
2137        .map(|key| {
2138            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
2139            // If we do not add alias, it will throw same field name error in the schema when adding projection.
2140            // For example:
2141            //    input scan : [a, b, c],
2142            //    join keys: [cast(a as int)]
2143            //
2144            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
2145            //  https://github.com/apache/datafusion/issues/4478
2146            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2147                let alias = format!("{key}");
2148                key.clone().alias(alias)
2149            } else {
2150                key.clone()
2151            }
2152        })
2153        .collect::<Vec<_>>();
2154
2155    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2156    let plan = if need_project {
2157        // Include all columns from the input and extend them with the join keys
2158        let mut projection = input_schema
2159            .columns()
2160            .into_iter()
2161            .map(Expr::Column)
2162            .collect::<Vec<_>>();
2163        #[allow(clippy::allow_attributes, clippy::mutable_key_type)]
2164        // Expr contains Arc with interior mutability but is intentionally used as hash key
2165        let join_key_items = alias_join_keys
2166            .iter()
2167            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2168            .cloned()
2169            .collect::<HashSet<Expr>>();
2170        projection.extend(join_key_items);
2171
2172        LogicalPlanBuilder::from(input)
2173            .project(projection.into_iter().map(SelectExpr::from))?
2174            .build()?
2175    } else {
2176        input
2177    };
2178
2179    let join_on = alias_join_keys
2180        .into_iter()
2181        .map(|key| {
2182            if let Some(col) = key.try_as_col() {
2183                Ok(col.clone())
2184            } else {
2185                let name = key.schema_name().to_string();
2186                Ok(Column::from_name(name))
2187            }
2188        })
2189        .collect::<Result<Vec<_>>>()?;
2190
2191    Ok((plan, join_on, need_project))
2192}
2193
2194/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2195/// that users will provide their own TableSource implementations or use DataFusion's
2196/// DefaultTableSource.
2197pub struct LogicalTableSource {
2198    table_schema: SchemaRef,
2199    constraints: Constraints,
2200}
2201
2202impl LogicalTableSource {
2203    /// Create a new LogicalTableSource
2204    pub fn new(table_schema: SchemaRef) -> Self {
2205        Self {
2206            table_schema,
2207            constraints: Constraints::default(),
2208        }
2209    }
2210
2211    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2212        self.constraints = constraints;
2213        self
2214    }
2215}
2216
2217impl TableSource for LogicalTableSource {
2218    fn schema(&self) -> SchemaRef {
2219        Arc::clone(&self.table_schema)
2220    }
2221
2222    fn constraints(&self) -> Option<&Constraints> {
2223        Some(&self.constraints)
2224    }
2225
2226    fn supports_filters_pushdown(
2227        &self,
2228        filters: &[&Expr],
2229    ) -> Result<Vec<TableProviderFilterPushDown>> {
2230        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2231    }
2232}
2233
2234/// Create a [`LogicalPlan::Unnest`] plan
2235pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2236    unnest_with_options(input, columns, UnnestOptions::default())
2237}
2238
2239pub fn get_struct_unnested_columns(
2240    col_name: &String,
2241    inner_fields: &Fields,
2242) -> Vec<Column> {
2243    inner_fields
2244        .iter()
2245        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2246        .collect()
2247}
2248
2249/// Create a [`LogicalPlan::Unnest`] plan with options
2250/// This function receive a list of columns to be unnested
2251/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2252/// The new schema will contains post-unnest fields replacing the original field
2253///
2254/// For example:
2255/// Input schema as
2256/// ```text
2257/// +---------------------+-------------------+
2258/// | col1                | col2              |
2259/// +---------------------+-------------------+
2260/// | Struct(INT64,INT32) | List(List(Int64)) |
2261/// +---------------------+-------------------+
2262/// ```
2263///
2264///
2265///
2266/// Then unnesting columns with:
2267/// - (col1,Struct)
2268/// - (col2,List(\[depth=1,depth=2\]))
2269///
2270/// will generate a new schema as
2271/// ```text
2272/// +---------+---------+---------------------+---------------------+
2273/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2274/// +---------+---------+---------------------+---------------------+
2275/// | Int64   | Int32   | List(Int64)         |  Int64              |
2276/// +---------+---------+---------------------+---------------------+
2277/// ```
2278pub fn unnest_with_options(
2279    input: LogicalPlan,
2280    columns_to_unnest: Vec<Column>,
2281    options: UnnestOptions,
2282) -> Result<LogicalPlan> {
2283    Ok(LogicalPlan::Unnest(Unnest::try_new(
2284        Arc::new(input),
2285        columns_to_unnest,
2286        options,
2287    )?))
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292    use std::vec;
2293
2294    use super::*;
2295    use crate::lit_with_metadata;
2296    use crate::logical_plan::StringifiedPlan;
2297    use crate::{col, expr, expr_fn::exists, in_subquery, scalar_subquery};
2298
2299    use crate::test::function_stub::sum;
2300    use datafusion_common::{
2301        Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2302    };
2303    use insta::assert_snapshot;
2304
2305    #[test]
2306    fn plan_builder_simple() -> Result<()> {
2307        let plan =
2308            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2309                .filter(col("state").eq(lit("CO")))?
2310                .project(vec![col("id")])?
2311                .build()?;
2312
2313        assert_snapshot!(plan, @r#"
2314        Projection: employee_csv.id
2315          Filter: employee_csv.state = Utf8("CO")
2316            TableScan: employee_csv projection=[id, state]
2317        "#);
2318
2319        Ok(())
2320    }
2321
2322    #[test]
2323    fn plan_builder_schema() {
2324        let schema = employee_schema();
2325        let projection = None;
2326        let plan =
2327            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2328                .unwrap();
2329        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2330
2331        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2332        // (and thus normalized to "employee"csv") as well
2333        let projection = None;
2334        let plan =
2335            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2336                .unwrap();
2337        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2338    }
2339
2340    #[test]
2341    fn plan_builder_empty_name() {
2342        let schema = employee_schema();
2343        let projection = None;
2344        let err =
2345            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2346        assert_snapshot!(
2347            err.strip_backtrace(),
2348            @"Error during planning: table_name cannot be empty"
2349        );
2350    }
2351
2352    #[test]
2353    fn plan_builder_sort() -> Result<()> {
2354        let plan =
2355            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2356                .sort(vec![
2357                    expr::Sort::new(col("state"), true, true),
2358                    expr::Sort::new(col("salary"), false, false),
2359                ])?
2360                .build()?;
2361
2362        assert_snapshot!(plan, @r"
2363        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2364          TableScan: employee_csv projection=[state, salary]
2365        ");
2366
2367        Ok(())
2368    }
2369
2370    #[test]
2371    fn plan_builder_union() -> Result<()> {
2372        let plan =
2373            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2374
2375        let plan = plan
2376            .clone()
2377            .union(plan.clone().build()?)?
2378            .union(plan.clone().build()?)?
2379            .union(plan.build()?)?
2380            .build()?;
2381
2382        assert_snapshot!(plan, @r"
2383        Union
2384          Union
2385            Union
2386              TableScan: employee_csv projection=[state, salary]
2387              TableScan: employee_csv projection=[state, salary]
2388            TableScan: employee_csv projection=[state, salary]
2389          TableScan: employee_csv projection=[state, salary]
2390        ");
2391
2392        Ok(())
2393    }
2394
2395    #[test]
2396    fn plan_builder_union_distinct() -> Result<()> {
2397        let plan =
2398            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2399
2400        let plan = plan
2401            .clone()
2402            .union_distinct(plan.clone().build()?)?
2403            .union_distinct(plan.clone().build()?)?
2404            .union_distinct(plan.build()?)?
2405            .build()?;
2406
2407        assert_snapshot!(plan, @r"
2408        Distinct:
2409          Union
2410            Distinct:
2411              Union
2412                Distinct:
2413                  Union
2414                    TableScan: employee_csv projection=[state, salary]
2415                    TableScan: employee_csv projection=[state, salary]
2416                TableScan: employee_csv projection=[state, salary]
2417            TableScan: employee_csv projection=[state, salary]
2418        ");
2419
2420        Ok(())
2421    }
2422
2423    #[test]
2424    fn plan_builder_simple_distinct() -> Result<()> {
2425        let plan =
2426            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2427                .filter(col("state").eq(lit("CO")))?
2428                .project(vec![col("id")])?
2429                .distinct()?
2430                .build()?;
2431
2432        assert_snapshot!(plan, @r#"
2433        Distinct:
2434          Projection: employee_csv.id
2435            Filter: employee_csv.state = Utf8("CO")
2436              TableScan: employee_csv projection=[id, state]
2437        "#);
2438
2439        Ok(())
2440    }
2441
2442    #[test]
2443    fn exists_subquery() -> Result<()> {
2444        let foo = test_table_scan_with_name("foo")?;
2445        let bar = test_table_scan_with_name("bar")?;
2446
2447        let subquery = LogicalPlanBuilder::from(foo)
2448            .project(vec![col("a")])?
2449            .filter(col("a").eq(col("bar.a")))?
2450            .build()?;
2451
2452        let outer_query = LogicalPlanBuilder::from(bar)
2453            .project(vec![col("a")])?
2454            .filter(exists(Arc::new(subquery)))?
2455            .build()?;
2456
2457        assert_snapshot!(outer_query, @r"
2458        Filter: EXISTS (<subquery>)
2459          Subquery:
2460            Filter: foo.a = bar.a
2461              Projection: foo.a
2462                TableScan: foo
2463          Projection: bar.a
2464            TableScan: bar
2465        ");
2466
2467        Ok(())
2468    }
2469
2470    #[test]
2471    fn filter_in_subquery() -> Result<()> {
2472        let foo = test_table_scan_with_name("foo")?;
2473        let bar = test_table_scan_with_name("bar")?;
2474
2475        let subquery = LogicalPlanBuilder::from(foo)
2476            .project(vec![col("a")])?
2477            .filter(col("a").eq(col("bar.a")))?
2478            .build()?;
2479
2480        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2481        let outer_query = LogicalPlanBuilder::from(bar)
2482            .project(vec![col("a")])?
2483            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2484            .build()?;
2485
2486        assert_snapshot!(outer_query, @r"
2487        Filter: bar.a IN (<subquery>)
2488          Subquery:
2489            Filter: foo.a = bar.a
2490              Projection: foo.a
2491                TableScan: foo
2492          Projection: bar.a
2493            TableScan: bar
2494        ");
2495
2496        Ok(())
2497    }
2498
2499    #[test]
2500    fn select_scalar_subquery() -> Result<()> {
2501        let foo = test_table_scan_with_name("foo")?;
2502        let bar = test_table_scan_with_name("bar")?;
2503
2504        let subquery = LogicalPlanBuilder::from(foo)
2505            .project(vec![col("b")])?
2506            .filter(col("a").eq(col("bar.a")))?
2507            .build()?;
2508
2509        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2510        let outer_query = LogicalPlanBuilder::from(bar)
2511            .project(vec![scalar_subquery(Arc::new(subquery))])?
2512            .build()?;
2513
2514        assert_snapshot!(outer_query, @r"
2515        Projection: (<subquery>)
2516          Subquery:
2517            Filter: foo.a = bar.a
2518              Projection: foo.b
2519                TableScan: foo
2520          TableScan: bar
2521        ");
2522
2523        Ok(())
2524    }
2525
2526    #[test]
2527    fn projection_non_unique_names() -> Result<()> {
2528        let plan = table_scan(
2529            Some("employee_csv"),
2530            &employee_schema(),
2531            // project id and first_name by column index
2532            Some(vec![0, 1]),
2533        )?
2534        // two columns with the same name => error
2535        .project(vec![col("id"), col("first_name").alias("id")]);
2536
2537        match plan {
2538            Err(DataFusionError::SchemaError(err, _)) => {
2539                if let SchemaError::AmbiguousReference { field } = *err {
2540                    let Column {
2541                        relation,
2542                        name,
2543                        spans: _,
2544                    } = *field;
2545                    let Some(TableReference::Bare { table }) = relation else {
2546                        return plan_err!(
2547                            "wrong relation: {relation:?}, expected table name"
2548                        );
2549                    };
2550                    assert_eq!(*"employee_csv", *table);
2551                    assert_eq!("id", &name);
2552                    Ok(())
2553                } else {
2554                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2555                }
2556            }
2557            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2558        }
2559    }
2560
2561    fn employee_schema() -> Schema {
2562        Schema::new(vec![
2563            Field::new("id", DataType::Int32, false),
2564            Field::new("first_name", DataType::Utf8, false),
2565            Field::new("last_name", DataType::Utf8, false),
2566            Field::new("state", DataType::Utf8, false),
2567            Field::new("salary", DataType::Int32, false),
2568        ])
2569    }
2570
2571    #[test]
2572    fn stringified_plan() {
2573        let stringified_plan =
2574            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2575        assert!(stringified_plan.should_display(true));
2576        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2577
2578        let stringified_plan =
2579            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2580        assert!(stringified_plan.should_display(true));
2581        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2582
2583        let stringified_plan =
2584            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2585        assert!(stringified_plan.should_display(true));
2586        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2587
2588        let stringified_plan =
2589            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2590        assert!(stringified_plan.should_display(true));
2591        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2592
2593        let stringified_plan = StringifiedPlan::new(
2594            PlanType::OptimizedLogicalPlan {
2595                optimizer_name: "random opt pass".into(),
2596            },
2597            "...the plan...",
2598        );
2599        assert!(stringified_plan.should_display(true));
2600        assert!(!stringified_plan.should_display(false));
2601    }
2602
2603    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2604        let schema = Schema::new(vec![
2605            Field::new("a", DataType::UInt32, false),
2606            Field::new("b", DataType::UInt32, false),
2607            Field::new("c", DataType::UInt32, false),
2608        ]);
2609        table_scan(Some(name), &schema, None)?.build()
2610    }
2611
2612    #[test]
2613    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2614        let plan1 =
2615            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2616        let plan2 =
2617            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2618
2619        let err_msg1 =
2620            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2621                .unwrap_err();
2622
2623        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2624
2625        Ok(())
2626    }
2627
2628    #[test]
2629    fn plan_builder_unnest() -> Result<()> {
2630        // Cannot unnest on a scalar column
2631        let err = nested_table_scan("test_table")?
2632            .unnest_column("scalar")
2633            .unwrap_err();
2634
2635        let DataFusionError::Internal(desc) = err else {
2636            return plan_err!("Plan should have returned an DataFusionError::Internal");
2637        };
2638
2639        let desc = (*desc
2640            .split(DataFusionError::BACK_TRACE_SEP)
2641            .collect::<Vec<&str>>()
2642            .first()
2643            .unwrap_or(&""))
2644        .to_string();
2645
2646        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2647
2648        // Unnesting the strings list.
2649        let plan = nested_table_scan("test_table")?
2650            .unnest_column("strings")?
2651            .build()?;
2652
2653        assert_snapshot!(plan, @r"
2654        Unnest: lists[test_table.strings|depth=1] structs[]
2655          TableScan: test_table
2656        ");
2657
2658        // Check unnested field is a scalar
2659        let field = plan.schema().field_with_name(None, "strings").unwrap();
2660        assert_eq!(&DataType::Utf8, field.data_type());
2661
2662        // Unnesting the singular struct column result into 2 new columns for each subfield
2663        let plan = nested_table_scan("test_table")?
2664            .unnest_column("struct_singular")?
2665            .build()?;
2666
2667        assert_snapshot!(plan, @r"
2668        Unnest: lists[] structs[test_table.struct_singular]
2669          TableScan: test_table
2670        ");
2671
2672        for field_name in &["a", "b"] {
2673            // Check unnested struct field is a scalar
2674            let field = plan
2675                .schema()
2676                .field_with_name(None, &format!("struct_singular.{field_name}"))
2677                .unwrap();
2678            assert_eq!(&DataType::UInt32, field.data_type());
2679        }
2680
2681        // Unnesting multiple fields in separate plans
2682        let plan = nested_table_scan("test_table")?
2683            .unnest_column("strings")?
2684            .unnest_column("structs")?
2685            .unnest_column("struct_singular")?
2686            .build()?;
2687
2688        assert_snapshot!(plan, @r"
2689        Unnest: lists[] structs[test_table.struct_singular]
2690          Unnest: lists[test_table.structs|depth=1] structs[]
2691            Unnest: lists[test_table.strings|depth=1] structs[]
2692              TableScan: test_table
2693        ");
2694
2695        // Check unnested struct list field should be a struct.
2696        let field = plan.schema().field_with_name(None, "structs").unwrap();
2697        assert!(matches!(field.data_type(), DataType::Struct(_)));
2698
2699        // Unnesting multiple fields at the same time, using infer syntax
2700        let cols = vec!["strings", "structs", "struct_singular"]
2701            .into_iter()
2702            .map(|c| c.into())
2703            .collect();
2704
2705        let plan = nested_table_scan("test_table")?
2706            .unnest_columns_with_options(cols, UnnestOptions::default())?
2707            .build()?;
2708
2709        assert_snapshot!(plan, @r"
2710        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2711          TableScan: test_table
2712        ");
2713
2714        // Unnesting missing column should fail.
2715        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2716        assert!(plan.is_err());
2717
2718        // Simultaneously unnesting a list (with different depth) and a struct column
2719        let plan = nested_table_scan("test_table")?
2720            .unnest_columns_with_options(
2721                vec!["stringss".into(), "struct_singular".into()],
2722                UnnestOptions::default()
2723                    .with_recursions(RecursionUnnestOption {
2724                        input_column: "stringss".into(),
2725                        output_column: "stringss_depth_1".into(),
2726                        depth: 1,
2727                    })
2728                    .with_recursions(RecursionUnnestOption {
2729                        input_column: "stringss".into(),
2730                        output_column: "stringss_depth_2".into(),
2731                        depth: 2,
2732                    }),
2733            )?
2734            .build()?;
2735
2736        assert_snapshot!(plan, @r"
2737        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2738          TableScan: test_table
2739        ");
2740
2741        // Check output columns has correct type
2742        let field = plan
2743            .schema()
2744            .field_with_name(None, "stringss_depth_1")
2745            .unwrap();
2746        assert_eq!(
2747            &DataType::new_list(DataType::Utf8, false),
2748            field.data_type()
2749        );
2750        let field = plan
2751            .schema()
2752            .field_with_name(None, "stringss_depth_2")
2753            .unwrap();
2754        assert_eq!(&DataType::Utf8, field.data_type());
2755        // unnesting struct is still correct
2756        for field_name in &["a", "b"] {
2757            let field = plan
2758                .schema()
2759                .field_with_name(None, &format!("struct_singular.{field_name}"))
2760                .unwrap();
2761            assert_eq!(&DataType::UInt32, field.data_type());
2762        }
2763
2764        Ok(())
2765    }
2766
2767    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2768        // Create a schema with a scalar field, a list of strings, a list of structs
2769        // and a singular struct
2770        let struct_field_in_list = Field::new_struct(
2771            "item",
2772            vec![
2773                Field::new("a", DataType::UInt32, false),
2774                Field::new("b", DataType::UInt32, false),
2775            ],
2776            false,
2777        );
2778        let string_field = Field::new_list_field(DataType::Utf8, false);
2779        let strings_field = Field::new_list("item", string_field.clone(), false);
2780        let schema = Schema::new(vec![
2781            Field::new("scalar", DataType::UInt32, false),
2782            Field::new_list("strings", string_field, false),
2783            Field::new_list("structs", struct_field_in_list, false),
2784            Field::new(
2785                "struct_singular",
2786                DataType::Struct(Fields::from(vec![
2787                    Field::new("a", DataType::UInt32, false),
2788                    Field::new("b", DataType::UInt32, false),
2789                ])),
2790                false,
2791            ),
2792            Field::new_list("stringss", strings_field, false),
2793        ]);
2794
2795        table_scan(Some(table_name), &schema, None)
2796    }
2797
2798    #[test]
2799    fn test_union_after_join() -> Result<()> {
2800        let values = vec![vec![lit(1)]];
2801
2802        let left = LogicalPlanBuilder::values(values.clone())?
2803            .alias("left")?
2804            .build()?;
2805        let right = LogicalPlanBuilder::values(values)?
2806            .alias("right")?
2807            .build()?;
2808
2809        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2810
2811        let plan = LogicalPlanBuilder::from(join.clone())
2812            .union(join)?
2813            .build()?;
2814
2815        assert_snapshot!(plan, @r"
2816        Union
2817          Cross Join:
2818            SubqueryAlias: left
2819              Values: (Int32(1))
2820            SubqueryAlias: right
2821              Values: (Int32(1))
2822          Cross Join:
2823            SubqueryAlias: left
2824              Values: (Int32(1))
2825            SubqueryAlias: right
2826              Values: (Int32(1))
2827        ");
2828
2829        Ok(())
2830    }
2831
2832    #[test]
2833    fn plan_builder_from_logical_plan() -> Result<()> {
2834        let plan =
2835            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2836                .sort(vec![
2837                    expr::Sort::new(col("state"), true, true),
2838                    expr::Sort::new(col("salary"), false, false),
2839                ])?
2840                .build()?;
2841
2842        let plan_expected = format!("{plan}");
2843        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2844        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2845
2846        Ok(())
2847    }
2848
2849    #[test]
2850    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2851        let constraints =
2852            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2853        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2854
2855        let plan =
2856            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2857                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2858                .build()?;
2859
2860        assert_snapshot!(plan, @r"
2861        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2862          TableScan: employee_csv projection=[id, state, salary]
2863        ");
2864
2865        Ok(())
2866    }
2867
2868    #[test]
2869    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2870        let constraints =
2871            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2872        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2873
2874        let options =
2875            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2876        let plan =
2877            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2878                .with_options(options)
2879                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2880                .build()?;
2881
2882        assert_snapshot!(plan, @r"
2883        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2884          TableScan: employee_csv projection=[id, state, salary]
2885        ");
2886
2887        Ok(())
2888    }
2889
2890    #[test]
2891    fn test_join_metadata() -> Result<()> {
2892        let left_schema = DFSchema::new_with_metadata(
2893            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2894            HashMap::from([("key".to_string(), "left".to_string())]),
2895        )?;
2896        let right_schema = DFSchema::new_with_metadata(
2897            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2898            HashMap::from([("key".to_string(), "right".to_string())]),
2899        )?;
2900
2901        let join_schema =
2902            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2903        assert_eq!(
2904            join_schema.metadata(),
2905            &HashMap::from([("key".to_string(), "left".to_string())])
2906        );
2907        let join_schema =
2908            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2909        assert_eq!(
2910            join_schema.metadata(),
2911            &HashMap::from([("key".to_string(), "right".to_string())])
2912        );
2913
2914        Ok(())
2915    }
2916
2917    #[test]
2918    fn test_values_metadata() -> Result<()> {
2919        let metadata: HashMap<String, String> =
2920            [("ARROW:extension:metadata".to_string(), "test".to_string())]
2921                .into_iter()
2922                .collect();
2923        let metadata = FieldMetadata::from(metadata);
2924        let values = LogicalPlanBuilder::values(vec![
2925            vec![lit_with_metadata(1, Some(metadata.clone()))],
2926            vec![lit_with_metadata(2, Some(metadata.clone()))],
2927        ])?
2928        .build()?;
2929        assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2930
2931        // Do not allow VALUES with different metadata mixed together
2932        let metadata2: HashMap<String, String> =
2933            [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2934                .into_iter()
2935                .collect();
2936        let metadata2 = FieldMetadata::from(metadata2);
2937        assert!(
2938            LogicalPlanBuilder::values(vec![
2939                vec![lit_with_metadata(1, Some(metadata.clone()))],
2940                vec![lit_with_metadata(2, Some(metadata2.clone()))],
2941            ])
2942            .is_err()
2943        );
2944
2945        Ok(())
2946    }
2947
2948    #[test]
2949    fn test_unique_field_aliases() {
2950        let t1_field_1 = Field::new("a", DataType::Int32, false);
2951        let t2_field_1 = Field::new("a", DataType::Int32, false);
2952        let t2_field_3 = Field::new("a", DataType::Int32, false);
2953        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2954        let t1_field_2 = Field::new("b", DataType::Int32, false);
2955        let t2_field_2 = Field::new("b", DataType::Int32, false);
2956
2957        let fields = vec![
2958            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2959        ];
2960        let fields = Fields::from(fields);
2961
2962        let remove_redundant = unique_field_aliases(&fields);
2963
2964        // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2965        // First occurrence of each field name keeps original name (None), duplicates get
2966        // incremental suffixes (:1, :2, etc.).
2967        // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2968        // conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2969        // as well to `a:1:1`.
2970        assert_eq!(
2971            remove_redundant,
2972            vec![
2973                None,
2974                Some("a:1".to_string()),
2975                None,
2976                Some("b:1".to_string()),
2977                Some("a:2".to_string()),
2978                Some("a:1:1".to_string()),
2979            ]
2980        );
2981    }
2982}