datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30    coerce_plan_expr_for_schema, normalize_col,
31    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32    rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38    Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44    group_window_expr_by_sort_keys,
45};
46use crate::{
47    DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
48    Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58    Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
59    TableReference, ToDFSchema, UnnestOptions, exec_err,
60    get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
61    plan_err,
62};
63use datafusion_expr_common::type_coercion::binary::type_union_resolution;
64
65use indexmap::IndexSet;
66
67/// Default table name for unnamed table
68pub const UNNAMED_TABLE: &str = "?table?";
69
70/// Options for [`LogicalPlanBuilder`]
71#[derive(Default, Debug, Clone)]
72pub struct LogicalPlanBuilderOptions {
73    /// Flag indicating whether the plan builder should add
74    /// functionally dependent expressions as additional aggregation groupings.
75    add_implicit_group_by_exprs: bool,
76}
77
78impl LogicalPlanBuilderOptions {
79    pub fn new() -> Self {
80        Default::default()
81    }
82
83    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
84    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
85        self.add_implicit_group_by_exprs = add;
86        self
87    }
88}
89
90/// Builder for logical plans
91///
92/// # Example building a simple plan
93/// ```
94/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
95/// # use datafusion_common::Result;
96/// # use arrow::datatypes::{Schema, DataType, Field};
97/// #
98/// # fn main() -> Result<()> {
99/// #
100/// # fn employee_schema() -> Schema {
101/// #    Schema::new(vec![
102/// #           Field::new("id", DataType::Int32, false),
103/// #           Field::new("first_name", DataType::Utf8, false),
104/// #           Field::new("last_name", DataType::Utf8, false),
105/// #           Field::new("state", DataType::Utf8, false),
106/// #           Field::new("salary", DataType::Int32, false),
107/// #       ])
108/// #   }
109/// #
110/// // Create a plan similar to
111/// // SELECT last_name
112/// // FROM employees
113/// // WHERE salary < 1000
114/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
115///  // Keep only rows where salary < 1000
116///  .filter(col("salary").lt(lit(1000)))?
117///  // only show "last_name" in the final results
118///  .project(vec![col("last_name")])?
119///  .build()?;
120///
121/// // Convert from plan back to builder
122/// let builder = LogicalPlanBuilder::from(plan);
123///
124/// # Ok(())
125/// # }
126/// ```
127#[derive(Debug, Clone)]
128pub struct LogicalPlanBuilder {
129    plan: Arc<LogicalPlan>,
130    options: LogicalPlanBuilderOptions,
131}
132
133impl LogicalPlanBuilder {
134    /// Create a builder from an existing plan
135    pub fn new(plan: LogicalPlan) -> Self {
136        Self {
137            plan: Arc::new(plan),
138            options: LogicalPlanBuilderOptions::default(),
139        }
140    }
141
142    /// Create a builder from an existing plan
143    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
144        Self {
145            plan,
146            options: LogicalPlanBuilderOptions::default(),
147        }
148    }
149
150    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
151        self.options = options;
152        self
153    }
154
155    /// Return the output schema of the plan build so far
156    pub fn schema(&self) -> &DFSchemaRef {
157        self.plan.schema()
158    }
159
160    /// Return the LogicalPlan of the plan build so far
161    pub fn plan(&self) -> &LogicalPlan {
162        &self.plan
163    }
164
165    /// Create an empty relation.
166    ///
167    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
168    pub fn empty(produce_one_row: bool) -> Self {
169        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
170            produce_one_row,
171            schema: DFSchemaRef::new(DFSchema::empty()),
172        }))
173    }
174
175    /// Convert a regular plan into a recursive query.
176    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
177    pub fn to_recursive_query(
178        self,
179        name: String,
180        recursive_term: LogicalPlan,
181        is_distinct: bool,
182    ) -> Result<Self> {
183        // Ensure that the static term and the recursive term have the same number of fields
184        let static_fields_len = self.plan.schema().fields().len();
185        let recursive_fields_len = recursive_term.schema().fields().len();
186        if static_fields_len != recursive_fields_len {
187            return plan_err!(
188                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
189                static_fields_len,
190                recursive_fields_len
191            );
192        }
193        // Ensure that the recursive term has the same field types as the static term
194        let coerced_recursive_term =
195            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
196        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
197            name,
198            static_term: self.plan,
199            recursive_term: Arc::new(coerced_recursive_term),
200            is_distinct,
201        })))
202    }
203
204    /// Create a values list based relation, and the schema is inferred from data, consuming
205    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
206    /// documentation for more details.
207    ///
208    /// so it's usually better to override the default names with a table alias list.
209    ///
210    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
211    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
212        if values.is_empty() {
213            return plan_err!("Values list cannot be empty");
214        }
215        let n_cols = values[0].len();
216        if n_cols == 0 {
217            return plan_err!("Values list cannot be zero length");
218        }
219        for (i, row) in values.iter().enumerate() {
220            if row.len() != n_cols {
221                return plan_err!(
222                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
223                    row.len(),
224                    i,
225                    n_cols
226                );
227            }
228        }
229
230        // Infer from data itself
231        Self::infer_data(values)
232    }
233
234    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
235    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
236    /// documentation for more details.
237    ///
238    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
239    /// The column names are not specified by the SQL standard and different database systems do it differently,
240    /// so it's usually better to override the default names with a table alias list.
241    ///
242    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
243    pub fn values_with_schema(
244        values: Vec<Vec<Expr>>,
245        schema: &DFSchemaRef,
246    ) -> Result<Self> {
247        if values.is_empty() {
248            return plan_err!("Values list cannot be empty");
249        }
250        let n_cols = schema.fields().len();
251        if n_cols == 0 {
252            return plan_err!("Values list cannot be zero length");
253        }
254        for (i, row) in values.iter().enumerate() {
255            if row.len() != n_cols {
256                return plan_err!(
257                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
258                    row.len(),
259                    i,
260                    n_cols
261                );
262            }
263        }
264
265        // Check the type of value against the schema
266        Self::infer_values_from_schema(values, schema)
267    }
268
269    fn infer_values_from_schema(
270        values: Vec<Vec<Expr>>,
271        schema: &DFSchema,
272    ) -> Result<Self> {
273        let n_cols = values[0].len();
274        let mut fields = ValuesFields::new();
275        for j in 0..n_cols {
276            let field_type = schema.field(j).data_type();
277            let field_nullable = schema.field(j).is_nullable();
278            for row in values.iter() {
279                let value = &row[j];
280                let data_type = value.get_type(schema)?;
281
282                if !data_type.equals_datatype(field_type)
283                    && !can_cast_types(&data_type, field_type)
284                {
285                    return exec_err!(
286                        "type mismatch and can't cast to got {} and {}",
287                        data_type,
288                        field_type
289                    );
290                }
291            }
292            fields.push(field_type.to_owned(), field_nullable);
293        }
294
295        Self::infer_inner(values, fields, schema)
296    }
297
298    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
299        let n_cols = values[0].len();
300        let schema = DFSchema::empty();
301        let mut fields = ValuesFields::new();
302
303        for j in 0..n_cols {
304            let mut common_type: Option<DataType> = None;
305            let mut common_metadata: Option<FieldMetadata> = None;
306            for (i, row) in values.iter().enumerate() {
307                let value = &row[j];
308                let metadata = value.metadata(&schema)?;
309                if let Some(ref cm) = common_metadata {
310                    if &metadata != cm {
311                        return plan_err!(
312                            "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
313                            cm,
314                            metadata
315                        );
316                    }
317                } else {
318                    common_metadata = Some(metadata.clone());
319                }
320                let data_type = value.get_type(&schema)?;
321                if data_type == DataType::Null {
322                    continue;
323                }
324
325                if let Some(prev_type) = common_type {
326                    // get common type of each column values.
327                    let data_types = vec![prev_type.clone(), data_type.clone()];
328                    let Some(new_type) = type_union_resolution(&data_types) else {
329                        return plan_err!(
330                            "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
331                        );
332                    };
333                    common_type = Some(new_type);
334                } else {
335                    common_type = Some(data_type);
336                }
337            }
338            // assuming common_type was not set, and no error, therefore the type should be NULL
339            // since the code loop skips NULL
340            fields.push_with_metadata(
341                common_type.unwrap_or(DataType::Null),
342                true,
343                common_metadata,
344            );
345        }
346
347        Self::infer_inner(values, fields, &schema)
348    }
349
350    fn infer_inner(
351        mut values: Vec<Vec<Expr>>,
352        fields: ValuesFields,
353        schema: &DFSchema,
354    ) -> Result<Self> {
355        let fields = fields.into_fields();
356        // wrap cast if data type is not same as common type.
357        for row in &mut values {
358            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
359                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
360                    row[j] = Expr::Literal(
361                        ScalarValue::try_from(field_type)?,
362                        metadata.clone(),
363                    );
364                } else {
365                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
366                }
367            }
368        }
369
370        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
371        let schema = DFSchemaRef::new(dfschema);
372
373        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
374    }
375
376    /// Convert a table provider into a builder with a TableScan
377    ///
378    /// Note that if you pass a string as `table_name`, it is treated
379    /// as a SQL identifier, as described on [`TableReference`] and
380    /// thus is normalized
381    ///
382    /// # Example:
383    /// ```
384    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
385    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
386    /// # };
387    /// # use std::sync::Arc;
388    /// # use arrow::datatypes::{Schema, DataType, Field};
389    /// # use datafusion_common::TableReference;
390    /// #
391    /// # let employee_schema = Arc::new(Schema::new(vec![
392    /// #           Field::new("id", DataType::Int32, false),
393    /// # ])) as _;
394    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
395    /// // Scan table_source with the name "mytable" (after normalization)
396    /// # let table = table_source.clone();
397    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
398    ///
399    /// // Scan table_source with the name "MyTable" by enclosing in quotes
400    /// # let table = table_source.clone();
401    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
402    ///
403    /// // Scan table_source with the name "MyTable" by forming the table reference
404    /// # let table = table_source.clone();
405    /// let table_reference = TableReference::bare("MyTable");
406    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
407    /// ```
408    pub fn scan(
409        table_name: impl Into<TableReference>,
410        table_source: Arc<dyn TableSource>,
411        projection: Option<Vec<usize>>,
412    ) -> Result<Self> {
413        Self::scan_with_filters(table_name, table_source, projection, vec![])
414    }
415
416    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
417    pub fn copy_to(
418        input: LogicalPlan,
419        output_url: String,
420        file_type: Arc<dyn FileType>,
421        options: HashMap<String, String>,
422        partition_by: Vec<String>,
423    ) -> Result<Self> {
424        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
425            Arc::new(input),
426            output_url,
427            partition_by,
428            file_type,
429            options,
430        ))))
431    }
432
433    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
434    ///
435    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
436    ///
437    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
438    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
439    ///
440    /// # Example:
441    /// ```
442    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
443    /// #  logical_plan::builder::LogicalTableSource,
444    /// # };
445    /// # use std::sync::Arc;
446    /// # use arrow::datatypes::{Schema, DataType, Field};
447    /// # use datafusion_expr::dml::InsertOp;
448    /// #
449    /// # fn test() -> datafusion_common::Result<()> {
450    /// # let employee_schema = Arc::new(Schema::new(vec![
451    /// #     Field::new("id", DataType::Int32, false),
452    /// # ])) as _;
453    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
454    /// // VALUES (1), (2)
455    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?.build()?;
456    /// // INSERT INTO MyTable VALUES (1), (2)
457    /// let insert_plan = LogicalPlanBuilder::insert_into(
458    ///     input,
459    ///     "MyTable",
460    ///     table_source,
461    ///     InsertOp::Append,
462    /// )?;
463    /// # Ok(())
464    /// # }
465    /// ```
466    pub fn insert_into(
467        input: LogicalPlan,
468        table_name: impl Into<TableReference>,
469        target: Arc<dyn TableSource>,
470        insert_op: InsertOp,
471    ) -> Result<Self> {
472        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
473            table_name.into(),
474            target,
475            WriteOp::Insert(insert_op),
476            Arc::new(input),
477        ))))
478    }
479
480    /// Convert a table provider into a builder with a TableScan
481    pub fn scan_with_filters(
482        table_name: impl Into<TableReference>,
483        table_source: Arc<dyn TableSource>,
484        projection: Option<Vec<usize>>,
485        filters: Vec<Expr>,
486    ) -> Result<Self> {
487        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
488    }
489
490    /// Convert a table provider into a builder with a TableScan with filter and fetch
491    pub fn scan_with_filters_fetch(
492        table_name: impl Into<TableReference>,
493        table_source: Arc<dyn TableSource>,
494        projection: Option<Vec<usize>>,
495        filters: Vec<Expr>,
496        fetch: Option<usize>,
497    ) -> Result<Self> {
498        Self::scan_with_filters_inner(
499            table_name,
500            table_source,
501            projection,
502            filters,
503            fetch,
504        )
505    }
506
507    fn scan_with_filters_inner(
508        table_name: impl Into<TableReference>,
509        table_source: Arc<dyn TableSource>,
510        projection: Option<Vec<usize>>,
511        filters: Vec<Expr>,
512        fetch: Option<usize>,
513    ) -> Result<Self> {
514        let table_scan =
515            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
516
517        // Inline TableScan
518        if table_scan.filters.is_empty()
519            && let Some(p) = table_scan.source.get_logical_plan()
520        {
521            let sub_plan = p.into_owned();
522
523            if let Some(proj) = table_scan.projection {
524                let projection_exprs = proj
525                    .into_iter()
526                    .map(|i| {
527                        Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
528                    })
529                    .collect::<Vec<_>>();
530                return Self::new(sub_plan)
531                    .project(projection_exprs)?
532                    .alias(table_scan.table_name);
533            }
534
535            // Ensures that the reference to the inlined table remains the
536            // same, meaning we don't have to change any of the parent nodes
537            // that reference this table.
538            return Self::new(sub_plan).alias(table_scan.table_name);
539        }
540
541        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542    }
543
544    /// Wrap a plan in a window
545    pub fn window_plan(
546        input: LogicalPlan,
547        window_exprs: impl IntoIterator<Item = Expr>,
548    ) -> Result<LogicalPlan> {
549        let mut plan = input;
550        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
552        // we compare the sort key themselves and if one window's sort keys are a prefix of another
553        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
554        // The sort_by() implementation here is a stable sort.
555        // Note that by this rule if there's an empty over, it'll be at the top level
556        groups.sort_by(|(key_a, _), (key_b, _)| {
557            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558                let key_ordering = compare_sort_expr(first, second, plan.schema());
559                match key_ordering {
560                    Ordering::Less => {
561                        return Ordering::Less;
562                    }
563                    Ordering::Greater => {
564                        return Ordering::Greater;
565                    }
566                    Ordering::Equal => {}
567                }
568            }
569            key_b.len().cmp(&key_a.len())
570        });
571        for (_, exprs) in groups {
572            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573            // Partition and sorting is done at physical level, see the EnforceDistribution
574            // and EnforceSorting rules.
575            plan = LogicalPlanBuilder::from(plan)
576                .window(window_exprs)?
577                .build()?;
578        }
579        Ok(plan)
580    }
581
582    /// Apply a projection without alias.
583    pub fn project(
584        self,
585        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586    ) -> Result<Self> {
587        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588    }
589
590    /// Apply a projection without alias with optional validation
591    /// (true to validate, false to not validate)
592    pub fn project_with_validation(
593        self,
594        expr: Vec<(impl Into<SelectExpr>, bool)>,
595    ) -> Result<Self> {
596        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597    }
598
599    /// Select the given column indices
600    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601        let exprs: Vec<_> = indices
602            .into_iter()
603            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604            .collect();
605        self.project(exprs)
606    }
607
608    /// Apply a filter
609    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610        let expr = normalize_col(expr.into(), &self.plan)?;
611        Filter::try_new(expr, self.plan)
612            .map(LogicalPlan::Filter)
613            .map(Self::new)
614    }
615
616    /// Apply a filter which is used for a having clause
617    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618        let expr = normalize_col(expr.into(), &self.plan)?;
619        Filter::try_new(expr, self.plan)
620            .map(LogicalPlan::Filter)
621            .map(Self::from)
622    }
623
624    /// Make a builder for a prepare logical plan from the builder's plan
625    pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627            Prepare {
628                name,
629                fields,
630                input: self.plan,
631            },
632        ))))
633    }
634
635    /// Limit the number of rows returned
636    ///
637    /// `skip` - Number of rows to skip before fetch any row.
638    ///
639    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
640    ///          if specified.
641    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642        let skip_expr = if skip == 0 {
643            None
644        } else {
645            Some(lit(skip as i64))
646        };
647        let fetch_expr = fetch.map(|f| lit(f as i64));
648        self.limit_by_expr(skip_expr, fetch_expr)
649    }
650
651    /// Limit the number of rows returned
652    ///
653    /// Similar to `limit` but uses expressions for `skip` and `fetch`
654    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655        Ok(Self::new(LogicalPlan::Limit(Limit {
656            skip: skip.map(Box::new),
657            fetch: fetch.map(Box::new),
658            input: self.plan,
659        })))
660    }
661
662    /// Apply an alias
663    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665    }
666
667    /// Add missing sort columns to all downstream projection
668    ///
669    /// Thus, if you have a LogicalPlan that selects A and B and have
670    /// not requested a sort by C, this code will add C recursively to
671    /// all input projections.
672    ///
673    /// Adding a new column is not correct if there is a `Distinct`
674    /// node, which produces only distinct values of its
675    /// inputs. Adding a new column to its input will result in
676    /// potentially different results than with the original column.
677    ///
678    /// For example, if the input is like:
679    ///
680    /// Distinct(A, B)
681    ///
682    /// If the input looks like
683    ///
684    /// a | b | c
685    /// --+---+---
686    /// 1 | 2 | 3
687    /// 1 | 2 | 4
688    ///
689    /// Distinct (A, B) --> (1,2)
690    ///
691    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
692    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
693    ///
694    /// See <https://github.com/apache/datafusion/issues/5065> for more details
695    fn add_missing_columns(
696        curr_plan: LogicalPlan,
697        missing_cols: &IndexSet<Column>,
698        is_distinct: bool,
699    ) -> Result<LogicalPlan> {
700        match curr_plan {
701            LogicalPlan::Projection(Projection {
702                input,
703                mut expr,
704                schema: _,
705            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706                let mut missing_exprs = missing_cols
707                    .iter()
708                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709                    .collect::<Result<Vec<_>>>()?;
710
711                // Do not let duplicate columns to be added, some of the
712                // missing_cols may be already present but without the new
713                // projected alias.
714                missing_exprs.retain(|e| !expr.contains(e));
715                if is_distinct {
716                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717                }
718                expr.extend(missing_exprs);
719                project(Arc::unwrap_or_clone(input), expr)
720            }
721            _ => {
722                let is_distinct =
723                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724                let new_inputs = curr_plan
725                    .inputs()
726                    .into_iter()
727                    .map(|input_plan| {
728                        Self::add_missing_columns(
729                            (*input_plan).clone(),
730                            missing_cols,
731                            is_distinct,
732                        )
733                    })
734                    .collect::<Result<Vec<_>>>()?;
735                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736            }
737        }
738    }
739
740    fn ambiguous_distinct_check(
741        missing_exprs: &[Expr],
742        missing_cols: &IndexSet<Column>,
743        projection_exprs: &[Expr],
744    ) -> Result<()> {
745        if missing_exprs.is_empty() {
746            return Ok(());
747        }
748
749        // if the missing columns are all only aliases for things in
750        // the existing select list, it is ok
751        //
752        // This handles the special case for
753        // SELECT col as <alias> ORDER BY <alias>
754        //
755        // As described in https://github.com/apache/datafusion/issues/5293
756        let all_aliases = missing_exprs.iter().all(|e| {
757            projection_exprs.iter().any(|proj_expr| {
758                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759                    e == expr.as_ref()
760                } else {
761                    false
762                }
763            })
764        });
765        if all_aliases {
766            return Ok(());
767        }
768
769        let missing_col_names = missing_cols
770            .iter()
771            .map(|col| col.flat_name())
772            .collect::<String>();
773
774        plan_err!(
775            "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
776        )
777    }
778
779    /// Apply a sort by provided expressions with default direction
780    pub fn sort_by(
781        self,
782        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
783    ) -> Result<Self> {
784        self.sort(
785            expr.into_iter()
786                .map(|e| e.into().sort(true, false))
787                .collect::<Vec<SortExpr>>(),
788        )
789    }
790
791    pub fn sort(
792        self,
793        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
794    ) -> Result<Self> {
795        self.sort_with_limit(sorts, None)
796    }
797
798    /// Apply a sort
799    pub fn sort_with_limit(
800        self,
801        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
802        fetch: Option<usize>,
803    ) -> Result<Self> {
804        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
805
806        let schema = self.plan.schema();
807
808        // Collect sort columns that are missing in the input plan's schema
809        let mut missing_cols: IndexSet<Column> = IndexSet::new();
810        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
811            let columns = sort.expr.column_refs();
812
813            missing_cols.extend(
814                columns
815                    .into_iter()
816                    .filter(|c| !schema.has_column(c))
817                    .cloned(),
818            );
819
820            Ok(())
821        })?;
822
823        if missing_cols.is_empty() {
824            return Ok(Self::new(LogicalPlan::Sort(Sort {
825                expr: normalize_sorts(sorts, &self.plan)?,
826                input: self.plan,
827                fetch,
828            })));
829        }
830
831        // remove pushed down sort columns
832        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
833
834        let is_distinct = false;
835        let plan = Self::add_missing_columns(
836            Arc::unwrap_or_clone(self.plan),
837            &missing_cols,
838            is_distinct,
839        )?;
840
841        let sort_plan = LogicalPlan::Sort(Sort {
842            expr: normalize_sorts(sorts, &plan)?,
843            input: Arc::new(plan),
844            fetch,
845        });
846
847        Projection::try_new(new_expr, Arc::new(sort_plan))
848            .map(LogicalPlan::Projection)
849            .map(Self::new)
850    }
851
852    /// Apply a union, preserving duplicate rows
853    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
854        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
855    }
856
857    /// Apply a union by name, preserving duplicate rows
858    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
859        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
860    }
861
862    /// Apply a union by name, removing duplicate rows
863    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
864        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
865        let right_plan: LogicalPlan = plan;
866
867        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
868            union_by_name(left_plan, right_plan)?,
869        )))))
870    }
871
872    /// Apply a union, removing duplicate rows
873    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
874        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
875        let right_plan: LogicalPlan = plan;
876
877        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
878            union(left_plan, right_plan)?,
879        )))))
880    }
881
882    /// Apply deduplication: Only distinct (different) values are returned)
883    pub fn distinct(self) -> Result<Self> {
884        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
885    }
886
887    /// Project first values of the specified expression list according to the provided
888    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
889    pub fn distinct_on(
890        self,
891        on_expr: Vec<Expr>,
892        select_expr: Vec<Expr>,
893        sort_expr: Option<Vec<SortExpr>>,
894    ) -> Result<Self> {
895        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
896            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
897        ))))
898    }
899
900    /// Apply a join to `right` using explicitly specified columns and an
901    /// optional filter expression.
902    ///
903    /// See [`join_on`](Self::join_on) for a more concise way to specify the
904    /// join condition. Since DataFusion will automatically identify and
905    /// optimize equality predicates there is no performance difference between
906    /// this function and `join_on`
907    ///
908    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
909    /// example below), which are then combined with the optional `filter`
910    /// expression.
911    ///
912    /// Note that in case of outer join, the `filter` is applied to only matched rows.
913    pub fn join(
914        self,
915        right: LogicalPlan,
916        join_type: JoinType,
917        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
918        filter: Option<Expr>,
919    ) -> Result<Self> {
920        self.join_detailed(
921            right,
922            join_type,
923            join_keys,
924            filter,
925            NullEquality::NullEqualsNothing,
926        )
927    }
928
929    /// Apply a join using the specified expressions.
930    ///
931    /// Note that DataFusion automatically optimizes joins, including
932    /// identifying and optimizing equality predicates.
933    ///
934    /// # Example
935    ///
936    /// ```
937    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
938    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
939    /// # use std::sync::Arc;
940    /// # use arrow::datatypes::{Schema, DataType, Field};
941    /// # use datafusion_common::Result;
942    /// # fn main() -> Result<()> {
943    /// let example_schema = Arc::new(Schema::new(vec![
944    ///     Field::new("a", DataType::Int32, false),
945    ///     Field::new("b", DataType::Int32, false),
946    ///     Field::new("c", DataType::Int32, false),
947    /// ]));
948    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
949    /// let left_table = table_source.clone();
950    /// let right_table = table_source.clone();
951    ///
952    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
953    ///
954    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
955    /// let exprs = vec![
956    ///     col("left.a").eq(col("right.a")),
957    ///     col("left.b").not_eq(col("right.b")),
958    /// ];
959    ///
960    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
961    /// // finding all pairs of rows from `left` and `right` where
962    /// // where `a = a2` and `b != b2`.
963    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
964    ///     .join_on(right_plan, JoinType::Inner, exprs)?
965    ///     .build()?;
966    /// # Ok(())
967    /// # }
968    /// ```
969    pub fn join_on(
970        self,
971        right: LogicalPlan,
972        join_type: JoinType,
973        on_exprs: impl IntoIterator<Item = Expr>,
974    ) -> Result<Self> {
975        let filter = on_exprs.into_iter().reduce(Expr::and);
976
977        self.join_detailed(
978            right,
979            join_type,
980            (Vec::<Column>::new(), Vec::<Column>::new()),
981            filter,
982            NullEquality::NullEqualsNothing,
983        )
984    }
985
986    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
987        if column.relation.is_some() {
988            // column is already normalized
989            return Ok(column);
990        }
991
992        let schema = plan.schema();
993        let fallback_schemas = plan.fallback_normalize_schemas();
994        let using_columns = plan.using_columns()?;
995        column.normalize_with_schemas_and_ambiguity_check(
996            &[&[schema], &fallback_schemas],
997            &using_columns,
998        )
999    }
1000
1001    /// Apply a join with on constraint and specified null equality.
1002    ///
1003    /// The behavior is the same as [`join`](Self::join) except that it allows
1004    /// specifying the null equality behavior.
1005    ///
1006    /// The `null_equality` dictates how `null` values are joined.
1007    pub fn join_detailed(
1008        self,
1009        right: LogicalPlan,
1010        join_type: JoinType,
1011        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1012        filter: Option<Expr>,
1013        null_equality: NullEquality,
1014    ) -> Result<Self> {
1015        if join_keys.0.len() != join_keys.1.len() {
1016            return plan_err!("left_keys and right_keys were not the same length");
1017        }
1018
1019        let filter = if let Some(expr) = filter {
1020            let filter = normalize_col_with_schemas_and_ambiguity_check(
1021                expr,
1022                &[&[self.schema(), right.schema()]],
1023                &[],
1024            )?;
1025            Some(filter)
1026        } else {
1027            None
1028        };
1029
1030        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1031            join_keys
1032                .0
1033                .into_iter()
1034                .zip(join_keys.1)
1035                .map(|(l, r)| {
1036                    let l = l.into();
1037                    let r = r.into();
1038
1039                    match (&l.relation, &r.relation) {
1040                        (Some(lr), Some(rr)) => {
1041                            let l_is_left =
1042                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1043                            let l_is_right =
1044                                right.schema().field_with_qualified_name(lr, &l.name);
1045                            let r_is_left =
1046                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1047                            let r_is_right =
1048                                right.schema().field_with_qualified_name(rr, &r.name);
1049
1050                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1051                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1052                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1053                                _ => (
1054                                    Self::normalize(&self.plan, l),
1055                                    Self::normalize(&right, r),
1056                                ),
1057                            }
1058                        }
1059                        (Some(lr), None) => {
1060                            let l_is_left =
1061                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1062                            let l_is_right =
1063                                right.schema().field_with_qualified_name(lr, &l.name);
1064
1065                            match (l_is_left, l_is_right) {
1066                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1067                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1068                                _ => (
1069                                    Self::normalize(&self.plan, l),
1070                                    Self::normalize(&right, r),
1071                                ),
1072                            }
1073                        }
1074                        (None, Some(rr)) => {
1075                            let r_is_left =
1076                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1077                            let r_is_right =
1078                                right.schema().field_with_qualified_name(rr, &r.name);
1079
1080                            match (r_is_left, r_is_right) {
1081                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1082                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1083                                _ => (
1084                                    Self::normalize(&self.plan, l),
1085                                    Self::normalize(&right, r),
1086                                ),
1087                            }
1088                        }
1089                        (None, None) => {
1090                            let mut swap = false;
1091                            let left_key = Self::normalize(&self.plan, l.clone())
1092                                .or_else(|_| {
1093                                    swap = true;
1094                                    Self::normalize(&right, l)
1095                                });
1096                            if swap {
1097                                (Self::normalize(&self.plan, r), left_key)
1098                            } else {
1099                                (left_key, Self::normalize(&right, r))
1100                            }
1101                        }
1102                    }
1103                })
1104                .unzip();
1105
1106        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1107        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1108
1109        let on: Vec<_> = left_keys
1110            .into_iter()
1111            .zip(right_keys)
1112            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1113            .collect();
1114        let join_schema =
1115            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1116
1117        // Inner type without join condition is cross join
1118        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1119            return plan_err!("join condition should not be empty");
1120        }
1121
1122        Ok(Self::new(LogicalPlan::Join(Join {
1123            left: self.plan,
1124            right: Arc::new(right),
1125            on,
1126            filter,
1127            join_type,
1128            join_constraint: JoinConstraint::On,
1129            schema: DFSchemaRef::new(join_schema),
1130            null_equality,
1131        })))
1132    }
1133
1134    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1135    pub fn join_using(
1136        self,
1137        right: LogicalPlan,
1138        join_type: JoinType,
1139        using_keys: Vec<Column>,
1140    ) -> Result<Self> {
1141        let left_keys: Vec<Column> = using_keys
1142            .clone()
1143            .into_iter()
1144            .map(|c| Self::normalize(&self.plan, c))
1145            .collect::<Result<_>>()?;
1146        let right_keys: Vec<Column> = using_keys
1147            .into_iter()
1148            .map(|c| Self::normalize(&right, c))
1149            .collect::<Result<_>>()?;
1150
1151        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1152        let mut join_on: Vec<(Expr, Expr)> = vec![];
1153        let mut filters: Option<Expr> = None;
1154        for (l, r) in &on {
1155            if self.plan.schema().has_column(l)
1156                && right.schema().has_column(r)
1157                && can_hash(
1158                    datafusion_common::ExprSchema::field_from_column(
1159                        self.plan.schema(),
1160                        l,
1161                    )?
1162                    .data_type(),
1163                )
1164            {
1165                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1166            } else if self.plan.schema().has_column(l)
1167                && right.schema().has_column(r)
1168                && can_hash(
1169                    datafusion_common::ExprSchema::field_from_column(
1170                        self.plan.schema(),
1171                        r,
1172                    )?
1173                    .data_type(),
1174                )
1175            {
1176                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1177            } else {
1178                let expr = binary_expr(
1179                    Expr::Column(l.clone()),
1180                    Operator::Eq,
1181                    Expr::Column(r.clone()),
1182                );
1183                match filters {
1184                    None => filters = Some(expr),
1185                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1186                }
1187            }
1188        }
1189
1190        if join_on.is_empty() {
1191            let join = Self::from(self.plan).cross_join(right)?;
1192            join.filter(filters.ok_or_else(|| {
1193                internal_datafusion_err!("filters should not be None here")
1194            })?)
1195        } else {
1196            let join = Join::try_new(
1197                self.plan,
1198                Arc::new(right),
1199                join_on,
1200                filters,
1201                join_type,
1202                JoinConstraint::Using,
1203                NullEquality::NullEqualsNothing,
1204            )?;
1205
1206            Ok(Self::new(LogicalPlan::Join(join)))
1207        }
1208    }
1209
1210    /// Apply a cross join
1211    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1212        let join = Join::try_new(
1213            self.plan,
1214            Arc::new(right),
1215            vec![],
1216            None,
1217            JoinType::Inner,
1218            JoinConstraint::On,
1219            NullEquality::NullEqualsNothing,
1220        )?;
1221
1222        Ok(Self::new(LogicalPlan::Join(join)))
1223    }
1224
1225    /// Repartition
1226    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1227        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1228            input: self.plan,
1229            partitioning_scheme,
1230        })))
1231    }
1232
1233    /// Apply a window functions to extend the schema
1234    pub fn window(
1235        self,
1236        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1237    ) -> Result<Self> {
1238        let window_expr = normalize_cols(window_expr, &self.plan)?;
1239        validate_unique_names("Windows", &window_expr)?;
1240        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1241            window_expr,
1242            self.plan,
1243        )?)))
1244    }
1245
1246    /// Apply an aggregate: grouping on the `group_expr` expressions
1247    /// and calculating `aggr_expr` aggregates for each distinct
1248    /// value of the `group_expr`;
1249    pub fn aggregate(
1250        self,
1251        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1252        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1253    ) -> Result<Self> {
1254        let group_expr = normalize_cols(group_expr, &self.plan)?;
1255        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1256
1257        let group_expr = if self.options.add_implicit_group_by_exprs {
1258            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1259        } else {
1260            group_expr
1261        };
1262
1263        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1264            .map(LogicalPlan::Aggregate)
1265            .map(Self::new)
1266    }
1267
1268    /// Create an expression to represent the explanation of the plan
1269    ///
1270    /// if `analyze` is true, runs the actual plan and produces
1271    /// information about metrics during run.
1272    ///
1273    /// if `verbose` is true, prints out additional details.
1274    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1275        // Keep the format default to Indent
1276        self.explain_option_format(
1277            ExplainOption::default()
1278                .with_verbose(verbose)
1279                .with_analyze(analyze),
1280        )
1281    }
1282
1283    /// Create an expression to represent the explanation of the plan
1284    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1285    /// Details see [`ExplainOption`].
1286    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1287        let schema = LogicalPlan::explain_schema();
1288        let schema = schema.to_dfschema_ref()?;
1289
1290        if explain_option.analyze {
1291            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1292                verbose: explain_option.verbose,
1293                input: self.plan,
1294                schema,
1295            })))
1296        } else {
1297            let stringified_plans =
1298                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1299
1300            Ok(Self::new(LogicalPlan::Explain(Explain {
1301                verbose: explain_option.verbose,
1302                plan: self.plan,
1303                explain_format: explain_option.format,
1304                stringified_plans,
1305                schema,
1306                logical_optimization_succeeded: false,
1307            })))
1308        }
1309    }
1310
1311    /// Process intersect set operator
1312    pub fn intersect(
1313        left_plan: LogicalPlan,
1314        right_plan: LogicalPlan,
1315        is_all: bool,
1316    ) -> Result<LogicalPlan> {
1317        LogicalPlanBuilder::intersect_or_except(
1318            left_plan,
1319            right_plan,
1320            JoinType::LeftSemi,
1321            is_all,
1322        )
1323    }
1324
1325    /// Process except set operator
1326    pub fn except(
1327        left_plan: LogicalPlan,
1328        right_plan: LogicalPlan,
1329        is_all: bool,
1330    ) -> Result<LogicalPlan> {
1331        LogicalPlanBuilder::intersect_or_except(
1332            left_plan,
1333            right_plan,
1334            JoinType::LeftAnti,
1335            is_all,
1336        )
1337    }
1338
1339    /// Process intersect or except
1340    fn intersect_or_except(
1341        left_plan: LogicalPlan,
1342        right_plan: LogicalPlan,
1343        join_type: JoinType,
1344        is_all: bool,
1345    ) -> Result<LogicalPlan> {
1346        let left_len = left_plan.schema().fields().len();
1347        let right_len = right_plan.schema().fields().len();
1348
1349        if left_len != right_len {
1350            return plan_err!(
1351                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1352            );
1353        }
1354
1355        // Requalify sides if needed to avoid duplicate qualified field names
1356        // (e.g., when both sides reference the same table)
1357        let left_builder = LogicalPlanBuilder::from(left_plan);
1358        let right_builder = LogicalPlanBuilder::from(right_plan);
1359        let (left_builder, right_builder, _requalified) =
1360            requalify_sides_if_needed(left_builder, right_builder)?;
1361        let left_plan = left_builder.build()?;
1362        let right_plan = right_builder.build()?;
1363
1364        let join_keys = left_plan
1365            .schema()
1366            .fields()
1367            .iter()
1368            .zip(right_plan.schema().fields().iter())
1369            .map(|(left_field, right_field)| {
1370                (
1371                    (Column::from_name(left_field.name())),
1372                    (Column::from_name(right_field.name())),
1373                )
1374            })
1375            .unzip();
1376        if is_all {
1377            LogicalPlanBuilder::from(left_plan)
1378                .join_detailed(
1379                    right_plan,
1380                    join_type,
1381                    join_keys,
1382                    None,
1383                    NullEquality::NullEqualsNull,
1384                )?
1385                .build()
1386        } else {
1387            LogicalPlanBuilder::from(left_plan)
1388                .distinct()?
1389                .join_detailed(
1390                    right_plan,
1391                    join_type,
1392                    join_keys,
1393                    None,
1394                    NullEquality::NullEqualsNull,
1395                )?
1396                .build()
1397        }
1398    }
1399
1400    /// Build the plan
1401    pub fn build(self) -> Result<LogicalPlan> {
1402        Ok(Arc::unwrap_or_clone(self.plan))
1403    }
1404
1405    /// Apply a join with both explicit equijoin and non equijoin predicates.
1406    ///
1407    /// Note this is a low level API that requires identifying specific
1408    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1409    /// automatically identifies predicates appropriately.
1410    ///
1411    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1412    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1413    /// to columns from the existing input. `r`, the second element of the tuple,
1414    /// must only refer to columns from the right input.
1415    ///
1416    /// `filter` contains any other filter expression to apply during the
1417    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1418    /// than the filter expressions, so they are preferred.
1419    pub fn join_with_expr_keys(
1420        self,
1421        right: LogicalPlan,
1422        join_type: JoinType,
1423        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1424        filter: Option<Expr>,
1425    ) -> Result<Self> {
1426        if equi_exprs.0.len() != equi_exprs.1.len() {
1427            return plan_err!("left_keys and right_keys were not the same length");
1428        }
1429
1430        let join_key_pairs = equi_exprs
1431            .0
1432            .into_iter()
1433            .zip(equi_exprs.1)
1434            .map(|(l, r)| {
1435                let left_key = l.into();
1436                let right_key = r.into();
1437                let mut left_using_columns  = HashSet::new();
1438                expr_to_columns(&left_key, &mut left_using_columns)?;
1439                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1440                    left_key,
1441                    &[&[self.plan.schema()]],
1442                    &[],
1443                )?;
1444
1445                let mut right_using_columns = HashSet::new();
1446                expr_to_columns(&right_key, &mut right_using_columns)?;
1447                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1448                    right_key,
1449                    &[&[right.schema()]],
1450                    &[],
1451                )?;
1452
1453                // find valid equijoin
1454                find_valid_equijoin_key_pair(
1455                        &normalized_left_key,
1456                        &normalized_right_key,
1457                        self.plan.schema(),
1458                        right.schema(),
1459                    )?.ok_or_else(||
1460                        plan_datafusion_err!(
1461                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1462                        ))
1463            })
1464            .collect::<Result<Vec<_>>>()?;
1465
1466        let join = Join::try_new(
1467            self.plan,
1468            Arc::new(right),
1469            join_key_pairs,
1470            filter,
1471            join_type,
1472            JoinConstraint::On,
1473            NullEquality::NullEqualsNothing,
1474        )?;
1475
1476        Ok(Self::new(LogicalPlan::Join(join)))
1477    }
1478
1479    /// Unnest the given column.
1480    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1481        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1482    }
1483
1484    /// Unnest the given column given [`UnnestOptions`]
1485    pub fn unnest_column_with_options(
1486        self,
1487        column: impl Into<Column>,
1488        options: UnnestOptions,
1489    ) -> Result<Self> {
1490        unnest_with_options(
1491            Arc::unwrap_or_clone(self.plan),
1492            vec![column.into()],
1493            options,
1494        )
1495        .map(Self::new)
1496    }
1497
1498    /// Unnest the given columns with the given [`UnnestOptions`]
1499    pub fn unnest_columns_with_options(
1500        self,
1501        columns: Vec<Column>,
1502        options: UnnestOptions,
1503    ) -> Result<Self> {
1504        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1505            .map(Self::new)
1506    }
1507}
1508
1509impl From<LogicalPlan> for LogicalPlanBuilder {
1510    fn from(plan: LogicalPlan) -> Self {
1511        LogicalPlanBuilder::new(plan)
1512    }
1513}
1514
1515impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1516    fn from(plan: Arc<LogicalPlan>) -> Self {
1517        LogicalPlanBuilder::new_from_arc(plan)
1518    }
1519}
1520
1521/// Container used when building fields for a `VALUES` node.
1522#[derive(Default)]
1523struct ValuesFields {
1524    inner: Vec<Field>,
1525}
1526
1527impl ValuesFields {
1528    pub fn new() -> Self {
1529        Self::default()
1530    }
1531
1532    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1533        self.push_with_metadata(data_type, nullable, None);
1534    }
1535
1536    pub fn push_with_metadata(
1537        &mut self,
1538        data_type: DataType,
1539        nullable: bool,
1540        metadata: Option<FieldMetadata>,
1541    ) {
1542        // Naming follows the convention described here:
1543        // https://www.postgresql.org/docs/current/queries-values.html
1544        let name = format!("column{}", self.inner.len() + 1);
1545        let mut field = Field::new(name, data_type, nullable);
1546        if let Some(metadata) = metadata {
1547            field.set_metadata(metadata.to_hashmap());
1548        }
1549        self.inner.push(field);
1550    }
1551
1552    pub fn into_fields(self) -> Fields {
1553        self.inner.into()
1554    }
1555}
1556
1557/// Returns aliases to make field names unique.
1558///
1559/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1560/// `Some(alias)` means rename to the alias to ensure uniqueness.
1561///
1562/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1563/// to maintain unique column names.
1564///
1565/// # Example
1566/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1567/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1568pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1569    // Some field names might already come to this function with the count (number of times it appeared)
1570    // as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1571    // if these three fields passed to this function: "col:1", "col" and "col", the function
1572    // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1573    // That's why we need the `seen` set, so the fields are always unique.
1574
1575    // Tracks a mapping between a field name and the number of appearances of that field.
1576    let mut name_map = HashMap::<&str, usize>::new();
1577    // Tracks all the fields and aliases that were previously seen.
1578    let mut seen = HashSet::<Cow<String>>::new();
1579
1580    fields
1581        .iter()
1582        .map(|field| {
1583            let original_name = field.name();
1584            let mut name = Cow::Borrowed(original_name);
1585
1586            let count = name_map.entry(original_name).or_insert(0);
1587
1588            // Loop until we find a name that hasn't been used.
1589            while seen.contains(&name) {
1590                *count += 1;
1591                name = Cow::Owned(format!("{original_name}:{count}"));
1592            }
1593
1594            seen.insert(name.clone());
1595
1596            match name {
1597                Cow::Borrowed(_) => None,
1598                Cow::Owned(alias) => Some(alias),
1599            }
1600        })
1601        .collect()
1602}
1603
1604fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1605    let mut table_references = schema
1606        .iter()
1607        .filter_map(|(qualifier, _)| qualifier)
1608        .collect::<Vec<_>>();
1609    table_references.dedup();
1610    let table_reference = if table_references.len() == 1 {
1611        table_references.pop().cloned()
1612    } else {
1613        None
1614    };
1615
1616    (
1617        table_reference,
1618        Arc::new(Field::new("mark", DataType::Boolean, false)),
1619    )
1620}
1621
1622/// Creates a schema for a join operation.
1623/// The fields from the left side are first
1624pub fn build_join_schema(
1625    left: &DFSchema,
1626    right: &DFSchema,
1627    join_type: &JoinType,
1628) -> Result<DFSchema> {
1629    fn nullify_fields<'a>(
1630        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1631    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1632        fields
1633            .map(|(q, f)| {
1634                // TODO: find a good way to do that
1635                let field = f.as_ref().clone().with_nullable(true);
1636                (q.cloned(), Arc::new(field))
1637            })
1638            .collect()
1639    }
1640
1641    let right_fields = right.iter();
1642    let left_fields = left.iter();
1643
1644    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1645        JoinType::Inner => {
1646            // left then right
1647            let left_fields = left_fields
1648                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1649                .collect::<Vec<_>>();
1650            let right_fields = right_fields
1651                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1652                .collect::<Vec<_>>();
1653            left_fields.into_iter().chain(right_fields).collect()
1654        }
1655        JoinType::Left => {
1656            // left then right, right set to nullable in case of not matched scenario
1657            let left_fields = left_fields
1658                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1659                .collect::<Vec<_>>();
1660            left_fields
1661                .into_iter()
1662                .chain(nullify_fields(right_fields))
1663                .collect()
1664        }
1665        JoinType::Right => {
1666            // left then right, left set to nullable in case of not matched scenario
1667            let right_fields = right_fields
1668                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1669                .collect::<Vec<_>>();
1670            nullify_fields(left_fields)
1671                .into_iter()
1672                .chain(right_fields)
1673                .collect()
1674        }
1675        JoinType::Full => {
1676            // left then right, all set to nullable in case of not matched scenario
1677            nullify_fields(left_fields)
1678                .into_iter()
1679                .chain(nullify_fields(right_fields))
1680                .collect()
1681        }
1682        JoinType::LeftSemi | JoinType::LeftAnti => {
1683            // Only use the left side for the schema
1684            left_fields
1685                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1686                .collect()
1687        }
1688        JoinType::LeftMark => left_fields
1689            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1690            .chain(once(mark_field(right)))
1691            .collect(),
1692        JoinType::RightSemi | JoinType::RightAnti => {
1693            // Only use the right side for the schema
1694            right_fields
1695                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1696                .collect()
1697        }
1698        JoinType::RightMark => right_fields
1699            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1700            .chain(once(mark_field(left)))
1701            .collect(),
1702    };
1703    let func_dependencies = left.functional_dependencies().join(
1704        right.functional_dependencies(),
1705        join_type,
1706        left.fields().len(),
1707    );
1708
1709    let (schema1, schema2) = match join_type {
1710        JoinType::Right
1711        | JoinType::RightSemi
1712        | JoinType::RightAnti
1713        | JoinType::RightMark => (left, right),
1714        _ => (right, left),
1715    };
1716
1717    let metadata = schema1
1718        .metadata()
1719        .clone()
1720        .into_iter()
1721        .chain(schema2.metadata().clone())
1722        .collect();
1723
1724    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1725    dfschema.with_functional_dependencies(func_dependencies)
1726}
1727
1728/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1729/// conflict with the columns from the other.
1730/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1731/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1732/// places (see e.g. DFSchema::check_names).
1733/// The function returns:
1734/// - The requalified or original left logical plan
1735/// - The requalified or original right logical plan
1736/// - If a requalification was needed or not
1737pub fn requalify_sides_if_needed(
1738    left: LogicalPlanBuilder,
1739    right: LogicalPlanBuilder,
1740) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1741    let left_cols = left.schema().columns();
1742    let right_cols = right.schema().columns();
1743
1744    // Requalify if merging the schemas would cause an error during join.
1745    // This can happen in several cases:
1746    // 1. Duplicate qualified fields: both sides have same relation.name
1747    // 2. Duplicate unqualified fields: both sides have same unqualified name
1748    // 3. Ambiguous reference: one side qualified, other unqualified, same name
1749    //
1750    // Implementation note: This uses a simple O(n*m) nested loop rather than
1751    // a HashMap-based O(n+m) approach. The nested loop is preferred because:
1752    // - Schemas are typically small (in TPCH benchmark, max is 16 columns),
1753    //   so n*m is negligible
1754    // - Early return on first conflict makes common case very fast
1755    // - Code is simpler and easier to reason about
1756    // - Called only during plan construction, not in execution hot path
1757    for l in &left_cols {
1758        for r in &right_cols {
1759            if l.name != r.name {
1760                continue;
1761            }
1762
1763            // Same name - check if this would cause a conflict
1764            match (&l.relation, &r.relation) {
1765                // Both qualified with same relation - duplicate qualified field
1766                (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1767                    return Ok((
1768                        left.alias(TableReference::bare("left"))?,
1769                        right.alias(TableReference::bare("right"))?,
1770                        true,
1771                    ));
1772                }
1773                // Both unqualified - duplicate unqualified field
1774                (None, None) => {
1775                    return Ok((
1776                        left.alias(TableReference::bare("left"))?,
1777                        right.alias(TableReference::bare("right"))?,
1778                        true,
1779                    ));
1780                }
1781                // One qualified, one not - ambiguous reference
1782                (Some(_), None) | (None, Some(_)) => {
1783                    return Ok((
1784                        left.alias(TableReference::bare("left"))?,
1785                        right.alias(TableReference::bare("right"))?,
1786                        true,
1787                    ));
1788                }
1789                // Different qualifiers - OK, no conflict
1790                _ => {}
1791            }
1792        }
1793    }
1794
1795    // No conflicts found
1796    Ok((left, right, false))
1797}
1798/// Add additional "synthetic" group by expressions based on functional
1799/// dependencies.
1800///
1801/// For example, if we are grouping on `[c1]`, and we know from
1802/// functional dependencies that column `c1` determines `c2`, this function
1803/// adds `c2` to the group by list.
1804///
1805/// This allows MySQL style selects like
1806/// `SELECT col FROM t WHERE pk = 5` if col is unique
1807pub fn add_group_by_exprs_from_dependencies(
1808    mut group_expr: Vec<Expr>,
1809    schema: &DFSchemaRef,
1810) -> Result<Vec<Expr>> {
1811    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1812    // c1 + 1` produces an output field named `"c1 + 1"`
1813    let mut group_by_field_names = group_expr
1814        .iter()
1815        .map(|e| e.schema_name().to_string())
1816        .collect::<Vec<_>>();
1817
1818    if let Some(target_indices) =
1819        get_target_functional_dependencies(schema, &group_by_field_names)
1820    {
1821        for idx in target_indices {
1822            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1823            let expr_name = expr.schema_name().to_string();
1824            if !group_by_field_names.contains(&expr_name) {
1825                group_by_field_names.push(expr_name);
1826                group_expr.push(expr);
1827            }
1828        }
1829    }
1830    Ok(group_expr)
1831}
1832
1833/// Errors if one or more expressions have equal names.
1834pub fn validate_unique_names<'a>(
1835    node_name: &str,
1836    expressions: impl IntoIterator<Item = &'a Expr>,
1837) -> Result<()> {
1838    let mut unique_names = HashMap::new();
1839
1840    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1841        let name = expr.schema_name().to_string();
1842        match unique_names.get(&name) {
1843            None => {
1844                unique_names.insert(name, (position, expr));
1845                Ok(())
1846            },
1847            Some((existing_position, existing_expr)) => {
1848                plan_err!("{node_name} require unique expression names \
1849                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1850                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1851                            )
1852            }
1853        }
1854    })
1855}
1856
1857/// Union two [`LogicalPlan`]s.
1858///
1859/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1860/// subtree expressions will not be properly typed until the optimizer pass.
1861///
1862/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1863/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1864/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1865///
1866/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1867/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1868pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1869    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1870        Arc::new(left_plan),
1871        Arc::new(right_plan),
1872    ])?))
1873}
1874
1875/// Like [`union`], but combine rows from different tables by name, rather than
1876/// by position.
1877pub fn union_by_name(
1878    left_plan: LogicalPlan,
1879    right_plan: LogicalPlan,
1880) -> Result<LogicalPlan> {
1881    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1882        Arc::new(left_plan),
1883        Arc::new(right_plan),
1884    ])?))
1885}
1886
1887/// Create Projection
1888/// # Errors
1889/// This function errors under any of the following conditions:
1890/// * Two or more expressions have the same name
1891/// * An invalid expression is used (e.g. a `sort` expression)
1892pub fn project(
1893    plan: LogicalPlan,
1894    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1895) -> Result<LogicalPlan> {
1896    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1897}
1898
1899/// Create Projection. Similar to project except that the expressions
1900/// passed in have a flag to indicate if that expression requires
1901/// validation (normalize & columnize) (true) or not (false)
1902/// # Errors
1903/// This function errors under any of the following conditions:
1904/// * Two or more expressions have the same name
1905/// * An invalid expression is used (e.g. a `sort` expression)
1906fn project_with_validation(
1907    plan: LogicalPlan,
1908    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1909) -> Result<LogicalPlan> {
1910    let mut projected_expr = vec![];
1911    for (e, validate) in expr {
1912        let e = e.into();
1913        match e {
1914            SelectExpr::Wildcard(opt) => {
1915                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1916
1917                // If there is a REPLACE statement, replace that column with the given
1918                // replace expression. Column name remains the same.
1919                let expanded = if let Some(replace) = opt.replace {
1920                    replace_columns(expanded, &replace)?
1921                } else {
1922                    expanded
1923                };
1924
1925                for e in expanded {
1926                    if validate {
1927                        projected_expr
1928                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1929                    } else {
1930                        projected_expr.push(e)
1931                    }
1932                }
1933            }
1934            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1935                let expanded =
1936                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1937
1938                // If there is a REPLACE statement, replace that column with the given
1939                // replace expression. Column name remains the same.
1940                let expanded = if let Some(replace) = opt.replace {
1941                    replace_columns(expanded, &replace)?
1942                } else {
1943                    expanded
1944                };
1945
1946                for e in expanded {
1947                    if validate {
1948                        projected_expr
1949                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1950                    } else {
1951                        projected_expr.push(e)
1952                    }
1953                }
1954            }
1955            SelectExpr::Expression(e) => {
1956                if validate {
1957                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1958                } else {
1959                    projected_expr.push(e)
1960                }
1961            }
1962        }
1963    }
1964    validate_unique_names("Projections", projected_expr.iter())?;
1965
1966    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1967}
1968
1969/// If there is a REPLACE statement in the projected expression in the form of
1970/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1971/// that column with the given replace expression. Column name remains the same.
1972/// Multiple REPLACEs are also possible with comma separations.
1973fn replace_columns(
1974    mut exprs: Vec<Expr>,
1975    replace: &PlannedReplaceSelectItem,
1976) -> Result<Vec<Expr>> {
1977    for expr in exprs.iter_mut() {
1978        if let Expr::Column(Column { name, .. }) = expr
1979            && let Some((_, new_expr)) = replace
1980                .items()
1981                .iter()
1982                .zip(replace.expressions().iter())
1983                .find(|(item, _)| item.column_name.value == *name)
1984        {
1985            *expr = new_expr.clone().alias(name.clone())
1986        }
1987    }
1988    Ok(exprs)
1989}
1990
1991/// Create a SubqueryAlias to wrap a LogicalPlan.
1992pub fn subquery_alias(
1993    plan: LogicalPlan,
1994    alias: impl Into<TableReference>,
1995) -> Result<LogicalPlan> {
1996    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1997}
1998
1999/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
2000/// This is mostly used for testing and documentation.
2001pub fn table_scan(
2002    name: Option<impl Into<TableReference>>,
2003    table_schema: &Schema,
2004    projection: Option<Vec<usize>>,
2005) -> Result<LogicalPlanBuilder> {
2006    table_scan_with_filters(name, table_schema, projection, vec![])
2007}
2008
2009/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2010/// and inlined filters.
2011/// This is mostly used for testing and documentation.
2012pub fn table_scan_with_filters(
2013    name: Option<impl Into<TableReference>>,
2014    table_schema: &Schema,
2015    projection: Option<Vec<usize>>,
2016    filters: Vec<Expr>,
2017) -> Result<LogicalPlanBuilder> {
2018    let table_source = table_source(table_schema);
2019    let name = name
2020        .map(|n| n.into())
2021        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2022    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2023}
2024
2025/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2026/// filters, and inlined fetch.
2027/// This is mostly used for testing and documentation.
2028pub fn table_scan_with_filter_and_fetch(
2029    name: Option<impl Into<TableReference>>,
2030    table_schema: &Schema,
2031    projection: Option<Vec<usize>>,
2032    filters: Vec<Expr>,
2033    fetch: Option<usize>,
2034) -> Result<LogicalPlanBuilder> {
2035    let table_source = table_source(table_schema);
2036    let name = name
2037        .map(|n| n.into())
2038        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2039    LogicalPlanBuilder::scan_with_filters_fetch(
2040        name,
2041        table_source,
2042        projection,
2043        filters,
2044        fetch,
2045    )
2046}
2047
2048pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2049    // TODO should we take SchemaRef and avoid cloning?
2050    let table_schema = Arc::new(table_schema.clone());
2051    Arc::new(LogicalTableSource {
2052        table_schema,
2053        constraints: Default::default(),
2054    })
2055}
2056
2057pub fn table_source_with_constraints(
2058    table_schema: &Schema,
2059    constraints: Constraints,
2060) -> Arc<dyn TableSource> {
2061    // TODO should we take SchemaRef and avoid cloning?
2062    let table_schema = Arc::new(table_schema.clone());
2063    Arc::new(LogicalTableSource {
2064        table_schema,
2065        constraints,
2066    })
2067}
2068
2069/// Wrap projection for a plan, if the join keys contains normal expression.
2070pub fn wrap_projection_for_join_if_necessary(
2071    join_keys: &[Expr],
2072    input: LogicalPlan,
2073) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2074    let input_schema = input.schema();
2075    let alias_join_keys: Vec<Expr> = join_keys
2076        .iter()
2077        .map(|key| {
2078            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
2079            // If we do not add alias, it will throw same field name error in the schema when adding projection.
2080            // For example:
2081            //    input scan : [a, b, c],
2082            //    join keys: [cast(a as int)]
2083            //
2084            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
2085            //  https://github.com/apache/datafusion/issues/4478
2086            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2087                let alias = format!("{key}");
2088                key.clone().alias(alias)
2089            } else {
2090                key.clone()
2091            }
2092        })
2093        .collect::<Vec<_>>();
2094
2095    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2096    let plan = if need_project {
2097        // Include all columns from the input and extend them with the join keys
2098        let mut projection = input_schema
2099            .columns()
2100            .into_iter()
2101            .map(Expr::Column)
2102            .collect::<Vec<_>>();
2103        let join_key_items = alias_join_keys
2104            .iter()
2105            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2106            .cloned()
2107            .collect::<HashSet<Expr>>();
2108        projection.extend(join_key_items);
2109
2110        LogicalPlanBuilder::from(input)
2111            .project(projection.into_iter().map(SelectExpr::from))?
2112            .build()?
2113    } else {
2114        input
2115    };
2116
2117    let join_on = alias_join_keys
2118        .into_iter()
2119        .map(|key| {
2120            if let Some(col) = key.try_as_col() {
2121                Ok(col.clone())
2122            } else {
2123                let name = key.schema_name().to_string();
2124                Ok(Column::from_name(name))
2125            }
2126        })
2127        .collect::<Result<Vec<_>>>()?;
2128
2129    Ok((plan, join_on, need_project))
2130}
2131
2132/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2133/// that users will provide their own TableSource implementations or use DataFusion's
2134/// DefaultTableSource.
2135pub struct LogicalTableSource {
2136    table_schema: SchemaRef,
2137    constraints: Constraints,
2138}
2139
2140impl LogicalTableSource {
2141    /// Create a new LogicalTableSource
2142    pub fn new(table_schema: SchemaRef) -> Self {
2143        Self {
2144            table_schema,
2145            constraints: Constraints::default(),
2146        }
2147    }
2148
2149    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2150        self.constraints = constraints;
2151        self
2152    }
2153}
2154
2155impl TableSource for LogicalTableSource {
2156    fn as_any(&self) -> &dyn Any {
2157        self
2158    }
2159
2160    fn schema(&self) -> SchemaRef {
2161        Arc::clone(&self.table_schema)
2162    }
2163
2164    fn constraints(&self) -> Option<&Constraints> {
2165        Some(&self.constraints)
2166    }
2167
2168    fn supports_filters_pushdown(
2169        &self,
2170        filters: &[&Expr],
2171    ) -> Result<Vec<TableProviderFilterPushDown>> {
2172        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2173    }
2174}
2175
2176/// Create a [`LogicalPlan::Unnest`] plan
2177pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2178    unnest_with_options(input, columns, UnnestOptions::default())
2179}
2180
2181pub fn get_struct_unnested_columns(
2182    col_name: &String,
2183    inner_fields: &Fields,
2184) -> Vec<Column> {
2185    inner_fields
2186        .iter()
2187        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2188        .collect()
2189}
2190
2191/// Create a [`LogicalPlan::Unnest`] plan with options
2192/// This function receive a list of columns to be unnested
2193/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2194/// The new schema will contains post-unnest fields replacing the original field
2195///
2196/// For example:
2197/// Input schema as
2198/// ```text
2199/// +---------------------+-------------------+
2200/// | col1                | col2              |
2201/// +---------------------+-------------------+
2202/// | Struct(INT64,INT32) | List(List(Int64)) |
2203/// +---------------------+-------------------+
2204/// ```
2205///
2206///
2207///
2208/// Then unnesting columns with:
2209/// - (col1,Struct)
2210/// - (col2,List(\[depth=1,depth=2\]))
2211///
2212/// will generate a new schema as
2213/// ```text
2214/// +---------+---------+---------------------+---------------------+
2215/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2216/// +---------+---------+---------------------+---------------------+
2217/// | Int64   | Int32   | List(Int64)         |  Int64              |
2218/// +---------+---------+---------------------+---------------------+
2219/// ```
2220pub fn unnest_with_options(
2221    input: LogicalPlan,
2222    columns_to_unnest: Vec<Column>,
2223    options: UnnestOptions,
2224) -> Result<LogicalPlan> {
2225    Ok(LogicalPlan::Unnest(Unnest::try_new(
2226        Arc::new(input),
2227        columns_to_unnest,
2228        options,
2229    )?))
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234    use std::vec;
2235
2236    use super::*;
2237    use crate::lit_with_metadata;
2238    use crate::logical_plan::StringifiedPlan;
2239    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2240
2241    use crate::test::function_stub::sum;
2242    use datafusion_common::{
2243        Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2244    };
2245    use insta::assert_snapshot;
2246
2247    #[test]
2248    fn plan_builder_simple() -> Result<()> {
2249        let plan =
2250            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2251                .filter(col("state").eq(lit("CO")))?
2252                .project(vec![col("id")])?
2253                .build()?;
2254
2255        assert_snapshot!(plan, @r#"
2256        Projection: employee_csv.id
2257          Filter: employee_csv.state = Utf8("CO")
2258            TableScan: employee_csv projection=[id, state]
2259        "#);
2260
2261        Ok(())
2262    }
2263
2264    #[test]
2265    fn plan_builder_schema() {
2266        let schema = employee_schema();
2267        let projection = None;
2268        let plan =
2269            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2270                .unwrap();
2271        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2272
2273        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2274        // (and thus normalized to "employee"csv") as well
2275        let projection = None;
2276        let plan =
2277            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2278                .unwrap();
2279        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2280    }
2281
2282    #[test]
2283    fn plan_builder_empty_name() {
2284        let schema = employee_schema();
2285        let projection = None;
2286        let err =
2287            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2288        assert_snapshot!(
2289            err.strip_backtrace(),
2290            @"Error during planning: table_name cannot be empty"
2291        );
2292    }
2293
2294    #[test]
2295    fn plan_builder_sort() -> Result<()> {
2296        let plan =
2297            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2298                .sort(vec![
2299                    expr::Sort::new(col("state"), true, true),
2300                    expr::Sort::new(col("salary"), false, false),
2301                ])?
2302                .build()?;
2303
2304        assert_snapshot!(plan, @r"
2305        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2306          TableScan: employee_csv projection=[state, salary]
2307        ");
2308
2309        Ok(())
2310    }
2311
2312    #[test]
2313    fn plan_builder_union() -> Result<()> {
2314        let plan =
2315            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2316
2317        let plan = plan
2318            .clone()
2319            .union(plan.clone().build()?)?
2320            .union(plan.clone().build()?)?
2321            .union(plan.build()?)?
2322            .build()?;
2323
2324        assert_snapshot!(plan, @r"
2325        Union
2326          Union
2327            Union
2328              TableScan: employee_csv projection=[state, salary]
2329              TableScan: employee_csv projection=[state, salary]
2330            TableScan: employee_csv projection=[state, salary]
2331          TableScan: employee_csv projection=[state, salary]
2332        ");
2333
2334        Ok(())
2335    }
2336
2337    #[test]
2338    fn plan_builder_union_distinct() -> Result<()> {
2339        let plan =
2340            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2341
2342        let plan = plan
2343            .clone()
2344            .union_distinct(plan.clone().build()?)?
2345            .union_distinct(plan.clone().build()?)?
2346            .union_distinct(plan.build()?)?
2347            .build()?;
2348
2349        assert_snapshot!(plan, @r"
2350        Distinct:
2351          Union
2352            Distinct:
2353              Union
2354                Distinct:
2355                  Union
2356                    TableScan: employee_csv projection=[state, salary]
2357                    TableScan: employee_csv projection=[state, salary]
2358                TableScan: employee_csv projection=[state, salary]
2359            TableScan: employee_csv projection=[state, salary]
2360        ");
2361
2362        Ok(())
2363    }
2364
2365    #[test]
2366    fn plan_builder_simple_distinct() -> Result<()> {
2367        let plan =
2368            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2369                .filter(col("state").eq(lit("CO")))?
2370                .project(vec![col("id")])?
2371                .distinct()?
2372                .build()?;
2373
2374        assert_snapshot!(plan, @r#"
2375        Distinct:
2376          Projection: employee_csv.id
2377            Filter: employee_csv.state = Utf8("CO")
2378              TableScan: employee_csv projection=[id, state]
2379        "#);
2380
2381        Ok(())
2382    }
2383
2384    #[test]
2385    fn exists_subquery() -> Result<()> {
2386        let foo = test_table_scan_with_name("foo")?;
2387        let bar = test_table_scan_with_name("bar")?;
2388
2389        let subquery = LogicalPlanBuilder::from(foo)
2390            .project(vec![col("a")])?
2391            .filter(col("a").eq(col("bar.a")))?
2392            .build()?;
2393
2394        let outer_query = LogicalPlanBuilder::from(bar)
2395            .project(vec![col("a")])?
2396            .filter(exists(Arc::new(subquery)))?
2397            .build()?;
2398
2399        assert_snapshot!(outer_query, @r"
2400        Filter: EXISTS (<subquery>)
2401          Subquery:
2402            Filter: foo.a = bar.a
2403              Projection: foo.a
2404                TableScan: foo
2405          Projection: bar.a
2406            TableScan: bar
2407        ");
2408
2409        Ok(())
2410    }
2411
2412    #[test]
2413    fn filter_in_subquery() -> Result<()> {
2414        let foo = test_table_scan_with_name("foo")?;
2415        let bar = test_table_scan_with_name("bar")?;
2416
2417        let subquery = LogicalPlanBuilder::from(foo)
2418            .project(vec![col("a")])?
2419            .filter(col("a").eq(col("bar.a")))?
2420            .build()?;
2421
2422        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2423        let outer_query = LogicalPlanBuilder::from(bar)
2424            .project(vec![col("a")])?
2425            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2426            .build()?;
2427
2428        assert_snapshot!(outer_query, @r"
2429        Filter: bar.a IN (<subquery>)
2430          Subquery:
2431            Filter: foo.a = bar.a
2432              Projection: foo.a
2433                TableScan: foo
2434          Projection: bar.a
2435            TableScan: bar
2436        ");
2437
2438        Ok(())
2439    }
2440
2441    #[test]
2442    fn select_scalar_subquery() -> Result<()> {
2443        let foo = test_table_scan_with_name("foo")?;
2444        let bar = test_table_scan_with_name("bar")?;
2445
2446        let subquery = LogicalPlanBuilder::from(foo)
2447            .project(vec![col("b")])?
2448            .filter(col("a").eq(col("bar.a")))?
2449            .build()?;
2450
2451        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2452        let outer_query = LogicalPlanBuilder::from(bar)
2453            .project(vec![scalar_subquery(Arc::new(subquery))])?
2454            .build()?;
2455
2456        assert_snapshot!(outer_query, @r"
2457        Projection: (<subquery>)
2458          Subquery:
2459            Filter: foo.a = bar.a
2460              Projection: foo.b
2461                TableScan: foo
2462          TableScan: bar
2463        ");
2464
2465        Ok(())
2466    }
2467
2468    #[test]
2469    fn projection_non_unique_names() -> Result<()> {
2470        let plan = table_scan(
2471            Some("employee_csv"),
2472            &employee_schema(),
2473            // project id and first_name by column index
2474            Some(vec![0, 1]),
2475        )?
2476        // two columns with the same name => error
2477        .project(vec![col("id"), col("first_name").alias("id")]);
2478
2479        match plan {
2480            Err(DataFusionError::SchemaError(err, _)) => {
2481                if let SchemaError::AmbiguousReference { field } = *err {
2482                    let Column {
2483                        relation,
2484                        name,
2485                        spans: _,
2486                    } = *field;
2487                    let Some(TableReference::Bare { table }) = relation else {
2488                        return plan_err!(
2489                            "wrong relation: {relation:?}, expected table name"
2490                        );
2491                    };
2492                    assert_eq!(*"employee_csv", *table);
2493                    assert_eq!("id", &name);
2494                    Ok(())
2495                } else {
2496                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2497                }
2498            }
2499            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2500        }
2501    }
2502
2503    fn employee_schema() -> Schema {
2504        Schema::new(vec![
2505            Field::new("id", DataType::Int32, false),
2506            Field::new("first_name", DataType::Utf8, false),
2507            Field::new("last_name", DataType::Utf8, false),
2508            Field::new("state", DataType::Utf8, false),
2509            Field::new("salary", DataType::Int32, false),
2510        ])
2511    }
2512
2513    #[test]
2514    fn stringified_plan() {
2515        let stringified_plan =
2516            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2517        assert!(stringified_plan.should_display(true));
2518        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2519
2520        let stringified_plan =
2521            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2522        assert!(stringified_plan.should_display(true));
2523        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2524
2525        let stringified_plan =
2526            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2527        assert!(stringified_plan.should_display(true));
2528        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2529
2530        let stringified_plan =
2531            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2532        assert!(stringified_plan.should_display(true));
2533        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2534
2535        let stringified_plan = StringifiedPlan::new(
2536            PlanType::OptimizedLogicalPlan {
2537                optimizer_name: "random opt pass".into(),
2538            },
2539            "...the plan...",
2540        );
2541        assert!(stringified_plan.should_display(true));
2542        assert!(!stringified_plan.should_display(false));
2543    }
2544
2545    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2546        let schema = Schema::new(vec![
2547            Field::new("a", DataType::UInt32, false),
2548            Field::new("b", DataType::UInt32, false),
2549            Field::new("c", DataType::UInt32, false),
2550        ]);
2551        table_scan(Some(name), &schema, None)?.build()
2552    }
2553
2554    #[test]
2555    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2556        let plan1 =
2557            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2558        let plan2 =
2559            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2560
2561        let err_msg1 =
2562            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2563                .unwrap_err();
2564
2565        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2566
2567        Ok(())
2568    }
2569
2570    #[test]
2571    fn plan_builder_unnest() -> Result<()> {
2572        // Cannot unnest on a scalar column
2573        let err = nested_table_scan("test_table")?
2574            .unnest_column("scalar")
2575            .unwrap_err();
2576
2577        let DataFusionError::Internal(desc) = err else {
2578            return plan_err!("Plan should have returned an DataFusionError::Internal");
2579        };
2580
2581        let desc = (*desc
2582            .split(DataFusionError::BACK_TRACE_SEP)
2583            .collect::<Vec<&str>>()
2584            .first()
2585            .unwrap_or(&""))
2586        .to_string();
2587
2588        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2589
2590        // Unnesting the strings list.
2591        let plan = nested_table_scan("test_table")?
2592            .unnest_column("strings")?
2593            .build()?;
2594
2595        assert_snapshot!(plan, @r"
2596        Unnest: lists[test_table.strings|depth=1] structs[]
2597          TableScan: test_table
2598        ");
2599
2600        // Check unnested field is a scalar
2601        let field = plan.schema().field_with_name(None, "strings").unwrap();
2602        assert_eq!(&DataType::Utf8, field.data_type());
2603
2604        // Unnesting the singular struct column result into 2 new columns for each subfield
2605        let plan = nested_table_scan("test_table")?
2606            .unnest_column("struct_singular")?
2607            .build()?;
2608
2609        assert_snapshot!(plan, @r"
2610        Unnest: lists[] structs[test_table.struct_singular]
2611          TableScan: test_table
2612        ");
2613
2614        for field_name in &["a", "b"] {
2615            // Check unnested struct field is a scalar
2616            let field = plan
2617                .schema()
2618                .field_with_name(None, &format!("struct_singular.{field_name}"))
2619                .unwrap();
2620            assert_eq!(&DataType::UInt32, field.data_type());
2621        }
2622
2623        // Unnesting multiple fields in separate plans
2624        let plan = nested_table_scan("test_table")?
2625            .unnest_column("strings")?
2626            .unnest_column("structs")?
2627            .unnest_column("struct_singular")?
2628            .build()?;
2629
2630        assert_snapshot!(plan, @r"
2631        Unnest: lists[] structs[test_table.struct_singular]
2632          Unnest: lists[test_table.structs|depth=1] structs[]
2633            Unnest: lists[test_table.strings|depth=1] structs[]
2634              TableScan: test_table
2635        ");
2636
2637        // Check unnested struct list field should be a struct.
2638        let field = plan.schema().field_with_name(None, "structs").unwrap();
2639        assert!(matches!(field.data_type(), DataType::Struct(_)));
2640
2641        // Unnesting multiple fields at the same time, using infer syntax
2642        let cols = vec!["strings", "structs", "struct_singular"]
2643            .into_iter()
2644            .map(|c| c.into())
2645            .collect();
2646
2647        let plan = nested_table_scan("test_table")?
2648            .unnest_columns_with_options(cols, UnnestOptions::default())?
2649            .build()?;
2650
2651        assert_snapshot!(plan, @r"
2652        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2653          TableScan: test_table
2654        ");
2655
2656        // Unnesting missing column should fail.
2657        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2658        assert!(plan.is_err());
2659
2660        // Simultaneously unnesting a list (with different depth) and a struct column
2661        let plan = nested_table_scan("test_table")?
2662            .unnest_columns_with_options(
2663                vec!["stringss".into(), "struct_singular".into()],
2664                UnnestOptions::default()
2665                    .with_recursions(RecursionUnnestOption {
2666                        input_column: "stringss".into(),
2667                        output_column: "stringss_depth_1".into(),
2668                        depth: 1,
2669                    })
2670                    .with_recursions(RecursionUnnestOption {
2671                        input_column: "stringss".into(),
2672                        output_column: "stringss_depth_2".into(),
2673                        depth: 2,
2674                    }),
2675            )?
2676            .build()?;
2677
2678        assert_snapshot!(plan, @r"
2679        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2680          TableScan: test_table
2681        ");
2682
2683        // Check output columns has correct type
2684        let field = plan
2685            .schema()
2686            .field_with_name(None, "stringss_depth_1")
2687            .unwrap();
2688        assert_eq!(
2689            &DataType::new_list(DataType::Utf8, false),
2690            field.data_type()
2691        );
2692        let field = plan
2693            .schema()
2694            .field_with_name(None, "stringss_depth_2")
2695            .unwrap();
2696        assert_eq!(&DataType::Utf8, field.data_type());
2697        // unnesting struct is still correct
2698        for field_name in &["a", "b"] {
2699            let field = plan
2700                .schema()
2701                .field_with_name(None, &format!("struct_singular.{field_name}"))
2702                .unwrap();
2703            assert_eq!(&DataType::UInt32, field.data_type());
2704        }
2705
2706        Ok(())
2707    }
2708
2709    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2710        // Create a schema with a scalar field, a list of strings, a list of structs
2711        // and a singular struct
2712        let struct_field_in_list = Field::new_struct(
2713            "item",
2714            vec![
2715                Field::new("a", DataType::UInt32, false),
2716                Field::new("b", DataType::UInt32, false),
2717            ],
2718            false,
2719        );
2720        let string_field = Field::new_list_field(DataType::Utf8, false);
2721        let strings_field = Field::new_list("item", string_field.clone(), false);
2722        let schema = Schema::new(vec![
2723            Field::new("scalar", DataType::UInt32, false),
2724            Field::new_list("strings", string_field, false),
2725            Field::new_list("structs", struct_field_in_list, false),
2726            Field::new(
2727                "struct_singular",
2728                DataType::Struct(Fields::from(vec![
2729                    Field::new("a", DataType::UInt32, false),
2730                    Field::new("b", DataType::UInt32, false),
2731                ])),
2732                false,
2733            ),
2734            Field::new_list("stringss", strings_field, false),
2735        ]);
2736
2737        table_scan(Some(table_name), &schema, None)
2738    }
2739
2740    #[test]
2741    fn test_union_after_join() -> Result<()> {
2742        let values = vec![vec![lit(1)]];
2743
2744        let left = LogicalPlanBuilder::values(values.clone())?
2745            .alias("left")?
2746            .build()?;
2747        let right = LogicalPlanBuilder::values(values)?
2748            .alias("right")?
2749            .build()?;
2750
2751        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2752
2753        let plan = LogicalPlanBuilder::from(join.clone())
2754            .union(join)?
2755            .build()?;
2756
2757        assert_snapshot!(plan, @r"
2758        Union
2759          Cross Join: 
2760            SubqueryAlias: left
2761              Values: (Int32(1))
2762            SubqueryAlias: right
2763              Values: (Int32(1))
2764          Cross Join: 
2765            SubqueryAlias: left
2766              Values: (Int32(1))
2767            SubqueryAlias: right
2768              Values: (Int32(1))
2769        ");
2770
2771        Ok(())
2772    }
2773
2774    #[test]
2775    fn plan_builder_from_logical_plan() -> Result<()> {
2776        let plan =
2777            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2778                .sort(vec![
2779                    expr::Sort::new(col("state"), true, true),
2780                    expr::Sort::new(col("salary"), false, false),
2781                ])?
2782                .build()?;
2783
2784        let plan_expected = format!("{plan}");
2785        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2786        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2787
2788        Ok(())
2789    }
2790
2791    #[test]
2792    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2793        let constraints =
2794            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2795        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2796
2797        let plan =
2798            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2799                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2800                .build()?;
2801
2802        assert_snapshot!(plan, @r"
2803        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2804          TableScan: employee_csv projection=[id, state, salary]
2805        ");
2806
2807        Ok(())
2808    }
2809
2810    #[test]
2811    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2812        let constraints =
2813            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2814        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2815
2816        let options =
2817            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2818        let plan =
2819            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2820                .with_options(options)
2821                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2822                .build()?;
2823
2824        assert_snapshot!(plan, @r"
2825        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2826          TableScan: employee_csv projection=[id, state, salary]
2827        ");
2828
2829        Ok(())
2830    }
2831
2832    #[test]
2833    fn test_join_metadata() -> Result<()> {
2834        let left_schema = DFSchema::new_with_metadata(
2835            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2836            HashMap::from([("key".to_string(), "left".to_string())]),
2837        )?;
2838        let right_schema = DFSchema::new_with_metadata(
2839            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2840            HashMap::from([("key".to_string(), "right".to_string())]),
2841        )?;
2842
2843        let join_schema =
2844            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2845        assert_eq!(
2846            join_schema.metadata(),
2847            &HashMap::from([("key".to_string(), "left".to_string())])
2848        );
2849        let join_schema =
2850            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2851        assert_eq!(
2852            join_schema.metadata(),
2853            &HashMap::from([("key".to_string(), "right".to_string())])
2854        );
2855
2856        Ok(())
2857    }
2858
2859    #[test]
2860    fn test_values_metadata() -> Result<()> {
2861        let metadata: HashMap<String, String> =
2862            [("ARROW:extension:metadata".to_string(), "test".to_string())]
2863                .into_iter()
2864                .collect();
2865        let metadata = FieldMetadata::from(metadata);
2866        let values = LogicalPlanBuilder::values(vec![
2867            vec![lit_with_metadata(1, Some(metadata.clone()))],
2868            vec![lit_with_metadata(2, Some(metadata.clone()))],
2869        ])?
2870        .build()?;
2871        assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2872
2873        // Do not allow VALUES with different metadata mixed together
2874        let metadata2: HashMap<String, String> =
2875            [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2876                .into_iter()
2877                .collect();
2878        let metadata2 = FieldMetadata::from(metadata2);
2879        assert!(
2880            LogicalPlanBuilder::values(vec![
2881                vec![lit_with_metadata(1, Some(metadata.clone()))],
2882                vec![lit_with_metadata(2, Some(metadata2.clone()))],
2883            ])
2884            .is_err()
2885        );
2886
2887        Ok(())
2888    }
2889
2890    #[test]
2891    fn test_unique_field_aliases() {
2892        let t1_field_1 = Field::new("a", DataType::Int32, false);
2893        let t2_field_1 = Field::new("a", DataType::Int32, false);
2894        let t2_field_3 = Field::new("a", DataType::Int32, false);
2895        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2896        let t1_field_2 = Field::new("b", DataType::Int32, false);
2897        let t2_field_2 = Field::new("b", DataType::Int32, false);
2898
2899        let fields = vec![
2900            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2901        ];
2902        let fields = Fields::from(fields);
2903
2904        let remove_redundant = unique_field_aliases(&fields);
2905
2906        // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2907        // First occurrence of each field name keeps original name (None), duplicates get
2908        // incremental suffixes (:1, :2, etc.).
2909        // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2910        // conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2911        // as well to `a:1:1`.
2912        assert_eq!(
2913            remove_redundant,
2914            vec![
2915                None,
2916                Some("a:1".to_string()),
2917                None,
2918                Some("b:1".to_string()),
2919                Some("a:2".to_string()),
2920                Some("a:1:1".to_string()),
2921            ]
2922        );
2923    }
2924}