datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30    coerce_plan_expr_for_schema, normalize_col,
31    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32    rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38    Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44    group_window_expr_by_sort_keys,
45};
46use crate::{
47    and, binary_expr, lit, DmlStatement, ExplainOption, Expr, ExprSchemable, Operator,
48    RecursiveQuery, Statement, TableProviderFilterPushDown, TableSource, WriteOp,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::{
57    exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
58    plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality,
59    Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
60};
61use datafusion_expr_common::type_coercion::binary::type_union_resolution;
62
63use indexmap::IndexSet;
64
65/// Default table name for unnamed table
66pub const UNNAMED_TABLE: &str = "?table?";
67
68/// Options for [`LogicalPlanBuilder`]
69#[derive(Default, Debug, Clone)]
70pub struct LogicalPlanBuilderOptions {
71    /// Flag indicating whether the plan builder should add
72    /// functionally dependent expressions as additional aggregation groupings.
73    add_implicit_group_by_exprs: bool,
74}
75
76impl LogicalPlanBuilderOptions {
77    pub fn new() -> Self {
78        Default::default()
79    }
80
81    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
82    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
83        self.add_implicit_group_by_exprs = add;
84        self
85    }
86}
87
88/// Builder for logical plans
89///
90/// # Example building a simple plan
91/// ```
92/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
93/// # use datafusion_common::Result;
94/// # use arrow::datatypes::{Schema, DataType, Field};
95/// #
96/// # fn main() -> Result<()> {
97/// #
98/// # fn employee_schema() -> Schema {
99/// #    Schema::new(vec![
100/// #           Field::new("id", DataType::Int32, false),
101/// #           Field::new("first_name", DataType::Utf8, false),
102/// #           Field::new("last_name", DataType::Utf8, false),
103/// #           Field::new("state", DataType::Utf8, false),
104/// #           Field::new("salary", DataType::Int32, false),
105/// #       ])
106/// #   }
107/// #
108/// // Create a plan similar to
109/// // SELECT last_name
110/// // FROM employees
111/// // WHERE salary < 1000
112/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
113///  // Keep only rows where salary < 1000
114///  .filter(col("salary").lt(lit(1000)))?
115///  // only show "last_name" in the final results
116///  .project(vec![col("last_name")])?
117///  .build()?;
118///
119/// // Convert from plan back to builder
120/// let builder = LogicalPlanBuilder::from(plan);
121///
122/// # Ok(())
123/// # }
124/// ```
125#[derive(Debug, Clone)]
126pub struct LogicalPlanBuilder {
127    plan: Arc<LogicalPlan>,
128    options: LogicalPlanBuilderOptions,
129}
130
131impl LogicalPlanBuilder {
132    /// Create a builder from an existing plan
133    pub fn new(plan: LogicalPlan) -> Self {
134        Self {
135            plan: Arc::new(plan),
136            options: LogicalPlanBuilderOptions::default(),
137        }
138    }
139
140    /// Create a builder from an existing plan
141    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
142        Self {
143            plan,
144            options: LogicalPlanBuilderOptions::default(),
145        }
146    }
147
148    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
149        self.options = options;
150        self
151    }
152
153    /// Return the output schema of the plan build so far
154    pub fn schema(&self) -> &DFSchemaRef {
155        self.plan.schema()
156    }
157
158    /// Return the LogicalPlan of the plan build so far
159    pub fn plan(&self) -> &LogicalPlan {
160        &self.plan
161    }
162
163    /// Create an empty relation.
164    ///
165    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
166    pub fn empty(produce_one_row: bool) -> Self {
167        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
168            produce_one_row,
169            schema: DFSchemaRef::new(DFSchema::empty()),
170        }))
171    }
172
173    /// Convert a regular plan into a recursive query.
174    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
175    pub fn to_recursive_query(
176        self,
177        name: String,
178        recursive_term: LogicalPlan,
179        is_distinct: bool,
180    ) -> Result<Self> {
181        // TODO: we need to do a bunch of validation here. Maybe more.
182        if is_distinct {
183            return not_impl_err!(
184                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
185            );
186        }
187        // Ensure that the static term and the recursive term have the same number of fields
188        let static_fields_len = self.plan.schema().fields().len();
189        let recursive_fields_len = recursive_term.schema().fields().len();
190        if static_fields_len != recursive_fields_len {
191            return plan_err!(
192                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
193                static_fields_len, recursive_fields_len
194            );
195        }
196        // Ensure that the recursive term has the same field types as the static term
197        let coerced_recursive_term =
198            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
199        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
200            name,
201            static_term: self.plan,
202            recursive_term: Arc::new(coerced_recursive_term),
203            is_distinct,
204        })))
205    }
206
207    /// Create a values list based relation, and the schema is inferred from data, consuming
208    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
209    /// documentation for more details.
210    ///
211    /// so it's usually better to override the default names with a table alias list.
212    ///
213    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
214    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
215        if values.is_empty() {
216            return plan_err!("Values list cannot be empty");
217        }
218        let n_cols = values[0].len();
219        if n_cols == 0 {
220            return plan_err!("Values list cannot be zero length");
221        }
222        for (i, row) in values.iter().enumerate() {
223            if row.len() != n_cols {
224                return plan_err!(
225                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
226                    row.len(),
227                    i,
228                    n_cols
229                );
230            }
231        }
232
233        // Infer from data itself
234        Self::infer_data(values)
235    }
236
237    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
238    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
239    /// documentation for more details.
240    ///
241    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
242    /// The column names are not specified by the SQL standard and different database systems do it differently,
243    /// so it's usually better to override the default names with a table alias list.
244    ///
245    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
246    pub fn values_with_schema(
247        values: Vec<Vec<Expr>>,
248        schema: &DFSchemaRef,
249    ) -> Result<Self> {
250        if values.is_empty() {
251            return plan_err!("Values list cannot be empty");
252        }
253        let n_cols = schema.fields().len();
254        if n_cols == 0 {
255            return plan_err!("Values list cannot be zero length");
256        }
257        for (i, row) in values.iter().enumerate() {
258            if row.len() != n_cols {
259                return plan_err!(
260                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
261                    row.len(),
262                    i,
263                    n_cols
264                );
265            }
266        }
267
268        // Check the type of value against the schema
269        Self::infer_values_from_schema(values, schema)
270    }
271
272    fn infer_values_from_schema(
273        values: Vec<Vec<Expr>>,
274        schema: &DFSchema,
275    ) -> Result<Self> {
276        let n_cols = values[0].len();
277        let mut fields = ValuesFields::new();
278        for j in 0..n_cols {
279            let field_type = schema.field(j).data_type();
280            let field_nullable = schema.field(j).is_nullable();
281            for row in values.iter() {
282                let value = &row[j];
283                let data_type = value.get_type(schema)?;
284
285                if !data_type.equals_datatype(field_type) {
286                    if can_cast_types(&data_type, field_type) {
287                    } else {
288                        return exec_err!(
289                            "type mismatch and can't cast to got {} and {}",
290                            data_type,
291                            field_type
292                        );
293                    }
294                }
295            }
296            fields.push(field_type.to_owned(), field_nullable);
297        }
298
299        Self::infer_inner(values, fields, schema)
300    }
301
302    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303        let n_cols = values[0].len();
304        let schema = DFSchema::empty();
305        let mut fields = ValuesFields::new();
306
307        for j in 0..n_cols {
308            let mut common_type: Option<DataType> = None;
309            for (i, row) in values.iter().enumerate() {
310                let value = &row[j];
311                let data_type = value.get_type(&schema)?;
312                if data_type == DataType::Null {
313                    continue;
314                }
315
316                if let Some(prev_type) = common_type {
317                    // get common type of each column values.
318                    let data_types = vec![prev_type.clone(), data_type.clone()];
319                    let Some(new_type) = type_union_resolution(&data_types) else {
320                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
321                    };
322                    common_type = Some(new_type);
323                } else {
324                    common_type = Some(data_type);
325                }
326            }
327            // assuming common_type was not set, and no error, therefore the type should be NULL
328            // since the code loop skips NULL
329            fields.push(common_type.unwrap_or(DataType::Null), true);
330        }
331
332        Self::infer_inner(values, fields, &schema)
333    }
334
335    fn infer_inner(
336        mut values: Vec<Vec<Expr>>,
337        fields: ValuesFields,
338        schema: &DFSchema,
339    ) -> Result<Self> {
340        let fields = fields.into_fields();
341        // wrap cast if data type is not same as common type.
342        for row in &mut values {
343            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
344                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
345                    row[j] = Expr::Literal(
346                        ScalarValue::try_from(field_type)?,
347                        metadata.clone(),
348                    );
349                } else {
350                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
351                }
352            }
353        }
354
355        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
356        let schema = DFSchemaRef::new(dfschema);
357
358        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
359    }
360
361    /// Convert a table provider into a builder with a TableScan
362    ///
363    /// Note that if you pass a string as `table_name`, it is treated
364    /// as a SQL identifier, as described on [`TableReference`] and
365    /// thus is normalized
366    ///
367    /// # Example:
368    /// ```
369    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
370    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
371    /// # };
372    /// # use std::sync::Arc;
373    /// # use arrow::datatypes::{Schema, DataType, Field};
374    /// # use datafusion_common::TableReference;
375    /// #
376    /// # let employee_schema = Arc::new(Schema::new(vec![
377    /// #           Field::new("id", DataType::Int32, false),
378    /// # ])) as _;
379    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
380    /// // Scan table_source with the name "mytable" (after normalization)
381    /// # let table = table_source.clone();
382    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
383    ///
384    /// // Scan table_source with the name "MyTable" by enclosing in quotes
385    /// # let table = table_source.clone();
386    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
387    ///
388    /// // Scan table_source with the name "MyTable" by forming the table reference
389    /// # let table = table_source.clone();
390    /// let table_reference = TableReference::bare("MyTable");
391    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
392    /// ```
393    pub fn scan(
394        table_name: impl Into<TableReference>,
395        table_source: Arc<dyn TableSource>,
396        projection: Option<Vec<usize>>,
397    ) -> Result<Self> {
398        Self::scan_with_filters(table_name, table_source, projection, vec![])
399    }
400
401    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
402    pub fn copy_to(
403        input: LogicalPlan,
404        output_url: String,
405        file_type: Arc<dyn FileType>,
406        options: HashMap<String, String>,
407        partition_by: Vec<String>,
408    ) -> Result<Self> {
409        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
410            Arc::new(input),
411            output_url,
412            partition_by,
413            file_type,
414            options,
415        ))))
416    }
417
418    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
419    ///
420    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
421    ///
422    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
423    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
424    ///
425    /// # Example:
426    /// ```
427    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
428    /// #  logical_plan::builder::LogicalTableSource,
429    /// # };
430    /// # use std::sync::Arc;
431    /// # use arrow::datatypes::{Schema, DataType, Field};
432    /// # use datafusion_expr::dml::InsertOp;
433    /// #
434    /// # fn test() -> datafusion_common::Result<()> {
435    /// # let employee_schema = Arc::new(Schema::new(vec![
436    /// #     Field::new("id", DataType::Int32, false),
437    /// # ])) as _;
438    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
439    /// // VALUES (1), (2)
440    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?
441    ///   .build()?;
442    /// // INSERT INTO MyTable VALUES (1), (2)
443    /// let insert_plan = LogicalPlanBuilder::insert_into(
444    ///   input,
445    ///   "MyTable",
446    ///   table_source,
447    ///   InsertOp::Append,
448    /// )?;
449    /// # Ok(())
450    /// # }
451    /// ```
452    pub fn insert_into(
453        input: LogicalPlan,
454        table_name: impl Into<TableReference>,
455        target: Arc<dyn TableSource>,
456        insert_op: InsertOp,
457    ) -> Result<Self> {
458        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
459            table_name.into(),
460            target,
461            WriteOp::Insert(insert_op),
462            Arc::new(input),
463        ))))
464    }
465
466    /// Convert a table provider into a builder with a TableScan
467    pub fn scan_with_filters(
468        table_name: impl Into<TableReference>,
469        table_source: Arc<dyn TableSource>,
470        projection: Option<Vec<usize>>,
471        filters: Vec<Expr>,
472    ) -> Result<Self> {
473        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
474    }
475
476    /// Convert a table provider into a builder with a TableScan with filter and fetch
477    pub fn scan_with_filters_fetch(
478        table_name: impl Into<TableReference>,
479        table_source: Arc<dyn TableSource>,
480        projection: Option<Vec<usize>>,
481        filters: Vec<Expr>,
482        fetch: Option<usize>,
483    ) -> Result<Self> {
484        Self::scan_with_filters_inner(
485            table_name,
486            table_source,
487            projection,
488            filters,
489            fetch,
490        )
491    }
492
493    fn scan_with_filters_inner(
494        table_name: impl Into<TableReference>,
495        table_source: Arc<dyn TableSource>,
496        projection: Option<Vec<usize>>,
497        filters: Vec<Expr>,
498        fetch: Option<usize>,
499    ) -> Result<Self> {
500        let table_scan =
501            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
502
503        // Inline TableScan
504        if table_scan.filters.is_empty() {
505            if let Some(p) = table_scan.source.get_logical_plan() {
506                let sub_plan = p.into_owned();
507
508                if let Some(proj) = table_scan.projection {
509                    let projection_exprs = proj
510                        .into_iter()
511                        .map(|i| {
512                            Expr::Column(Column::from(
513                                sub_plan.schema().qualified_field(i),
514                            ))
515                        })
516                        .collect::<Vec<_>>();
517                    return Self::new(sub_plan)
518                        .project(projection_exprs)?
519                        .alias(table_scan.table_name);
520                }
521
522                // Ensures that the reference to the inlined table remains the
523                // same, meaning we don't have to change any of the parent nodes
524                // that reference this table.
525                return Self::new(sub_plan).alias(table_scan.table_name);
526            }
527        }
528
529        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
530    }
531
532    /// Wrap a plan in a window
533    pub fn window_plan(
534        input: LogicalPlan,
535        window_exprs: impl IntoIterator<Item = Expr>,
536    ) -> Result<LogicalPlan> {
537        let mut plan = input;
538        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
539        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
540        // we compare the sort key themselves and if one window's sort keys are a prefix of another
541        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
542        // The sort_by() implementation here is a stable sort.
543        // Note that by this rule if there's an empty over, it'll be at the top level
544        groups.sort_by(|(key_a, _), (key_b, _)| {
545            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
546                let key_ordering = compare_sort_expr(first, second, plan.schema());
547                match key_ordering {
548                    Ordering::Less => {
549                        return Ordering::Less;
550                    }
551                    Ordering::Greater => {
552                        return Ordering::Greater;
553                    }
554                    Ordering::Equal => {}
555                }
556            }
557            key_b.len().cmp(&key_a.len())
558        });
559        for (_, exprs) in groups {
560            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
561            // Partition and sorting is done at physical level, see the EnforceDistribution
562            // and EnforceSorting rules.
563            plan = LogicalPlanBuilder::from(plan)
564                .window(window_exprs)?
565                .build()?;
566        }
567        Ok(plan)
568    }
569
570    /// Apply a projection without alias.
571    pub fn project(
572        self,
573        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
574    ) -> Result<Self> {
575        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
576    }
577
578    /// Apply a projection without alias with optional validation
579    /// (true to validate, false to not validate)
580    pub fn project_with_validation(
581        self,
582        expr: Vec<(impl Into<SelectExpr>, bool)>,
583    ) -> Result<Self> {
584        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
585    }
586
587    /// Select the given column indices
588    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
589        let exprs: Vec<_> = indices
590            .into_iter()
591            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
592            .collect();
593        self.project(exprs)
594    }
595
596    /// Apply a filter
597    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
598        let expr = normalize_col(expr.into(), &self.plan)?;
599        Filter::try_new(expr, self.plan)
600            .map(LogicalPlan::Filter)
601            .map(Self::new)
602    }
603
604    /// Apply a filter which is used for a having clause
605    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
606        let expr = normalize_col(expr.into(), &self.plan)?;
607        Filter::try_new(expr, self.plan)
608            .map(LogicalPlan::Filter)
609            .map(Self::from)
610    }
611
612    /// Make a builder for a prepare logical plan from the builder's plan
613    pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
614        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
615            Prepare {
616                name,
617                data_types,
618                input: self.plan,
619            },
620        ))))
621    }
622
623    /// Limit the number of rows returned
624    ///
625    /// `skip` - Number of rows to skip before fetch any row.
626    ///
627    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
628    ///          if specified.
629    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
630        let skip_expr = if skip == 0 {
631            None
632        } else {
633            Some(lit(skip as i64))
634        };
635        let fetch_expr = fetch.map(|f| lit(f as i64));
636        self.limit_by_expr(skip_expr, fetch_expr)
637    }
638
639    /// Limit the number of rows returned
640    ///
641    /// Similar to `limit` but uses expressions for `skip` and `fetch`
642    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
643        Ok(Self::new(LogicalPlan::Limit(Limit {
644            skip: skip.map(Box::new),
645            fetch: fetch.map(Box::new),
646            input: self.plan,
647        })))
648    }
649
650    /// Apply an alias
651    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
652        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
653    }
654
655    /// Add missing sort columns to all downstream projection
656    ///
657    /// Thus, if you have a LogicalPlan that selects A and B and have
658    /// not requested a sort by C, this code will add C recursively to
659    /// all input projections.
660    ///
661    /// Adding a new column is not correct if there is a `Distinct`
662    /// node, which produces only distinct values of its
663    /// inputs. Adding a new column to its input will result in
664    /// potentially different results than with the original column.
665    ///
666    /// For example, if the input is like:
667    ///
668    /// Distinct(A, B)
669    ///
670    /// If the input looks like
671    ///
672    /// a | b | c
673    /// --+---+---
674    /// 1 | 2 | 3
675    /// 1 | 2 | 4
676    ///
677    /// Distinct (A, B) --> (1,2)
678    ///
679    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
680    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
681    ///
682    /// See <https://github.com/apache/datafusion/issues/5065> for more details
683    fn add_missing_columns(
684        curr_plan: LogicalPlan,
685        missing_cols: &IndexSet<Column>,
686        is_distinct: bool,
687    ) -> Result<LogicalPlan> {
688        match curr_plan {
689            LogicalPlan::Projection(Projection {
690                input,
691                mut expr,
692                schema: _,
693            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
694                let mut missing_exprs = missing_cols
695                    .iter()
696                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
697                    .collect::<Result<Vec<_>>>()?;
698
699                // Do not let duplicate columns to be added, some of the
700                // missing_cols may be already present but without the new
701                // projected alias.
702                missing_exprs.retain(|e| !expr.contains(e));
703                if is_distinct {
704                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
705                }
706                expr.extend(missing_exprs);
707                project(Arc::unwrap_or_clone(input), expr)
708            }
709            _ => {
710                let is_distinct =
711                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
712                let new_inputs = curr_plan
713                    .inputs()
714                    .into_iter()
715                    .map(|input_plan| {
716                        Self::add_missing_columns(
717                            (*input_plan).clone(),
718                            missing_cols,
719                            is_distinct,
720                        )
721                    })
722                    .collect::<Result<Vec<_>>>()?;
723                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
724            }
725        }
726    }
727
728    fn ambiguous_distinct_check(
729        missing_exprs: &[Expr],
730        missing_cols: &IndexSet<Column>,
731        projection_exprs: &[Expr],
732    ) -> Result<()> {
733        if missing_exprs.is_empty() {
734            return Ok(());
735        }
736
737        // if the missing columns are all only aliases for things in
738        // the existing select list, it is ok
739        //
740        // This handles the special case for
741        // SELECT col as <alias> ORDER BY <alias>
742        //
743        // As described in https://github.com/apache/datafusion/issues/5293
744        let all_aliases = missing_exprs.iter().all(|e| {
745            projection_exprs.iter().any(|proj_expr| {
746                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
747                    e == expr.as_ref()
748                } else {
749                    false
750                }
751            })
752        });
753        if all_aliases {
754            return Ok(());
755        }
756
757        let missing_col_names = missing_cols
758            .iter()
759            .map(|col| col.flat_name())
760            .collect::<String>();
761
762        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
763    }
764
765    /// Apply a sort by provided expressions with default direction
766    pub fn sort_by(
767        self,
768        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
769    ) -> Result<Self> {
770        self.sort(
771            expr.into_iter()
772                .map(|e| e.into().sort(true, false))
773                .collect::<Vec<SortExpr>>(),
774        )
775    }
776
777    pub fn sort(
778        self,
779        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
780    ) -> Result<Self> {
781        self.sort_with_limit(sorts, None)
782    }
783
784    /// Apply a sort
785    pub fn sort_with_limit(
786        self,
787        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
788        fetch: Option<usize>,
789    ) -> Result<Self> {
790        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
791
792        let schema = self.plan.schema();
793
794        // Collect sort columns that are missing in the input plan's schema
795        let mut missing_cols: IndexSet<Column> = IndexSet::new();
796        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
797            let columns = sort.expr.column_refs();
798
799            missing_cols.extend(
800                columns
801                    .into_iter()
802                    .filter(|c| !schema.has_column(c))
803                    .cloned(),
804            );
805
806            Ok(())
807        })?;
808
809        if missing_cols.is_empty() {
810            return Ok(Self::new(LogicalPlan::Sort(Sort {
811                expr: normalize_sorts(sorts, &self.plan)?,
812                input: self.plan,
813                fetch,
814            })));
815        }
816
817        // remove pushed down sort columns
818        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
819
820        let is_distinct = false;
821        let plan = Self::add_missing_columns(
822            Arc::unwrap_or_clone(self.plan),
823            &missing_cols,
824            is_distinct,
825        )?;
826
827        let sort_plan = LogicalPlan::Sort(Sort {
828            expr: normalize_sorts(sorts, &plan)?,
829            input: Arc::new(plan),
830            fetch,
831        });
832
833        Projection::try_new(new_expr, Arc::new(sort_plan))
834            .map(LogicalPlan::Projection)
835            .map(Self::new)
836    }
837
838    /// Apply a union, preserving duplicate rows
839    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
840        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
841    }
842
843    /// Apply a union by name, preserving duplicate rows
844    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
845        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
846    }
847
848    /// Apply a union by name, removing duplicate rows
849    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
850        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
851        let right_plan: LogicalPlan = plan;
852
853        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
854            union_by_name(left_plan, right_plan)?,
855        )))))
856    }
857
858    /// Apply a union, removing duplicate rows
859    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
860        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
861        let right_plan: LogicalPlan = plan;
862
863        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
864            union(left_plan, right_plan)?,
865        )))))
866    }
867
868    /// Apply deduplication: Only distinct (different) values are returned)
869    pub fn distinct(self) -> Result<Self> {
870        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
871    }
872
873    /// Project first values of the specified expression list according to the provided
874    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
875    pub fn distinct_on(
876        self,
877        on_expr: Vec<Expr>,
878        select_expr: Vec<Expr>,
879        sort_expr: Option<Vec<SortExpr>>,
880    ) -> Result<Self> {
881        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
882            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
883        ))))
884    }
885
886    /// Apply a join to `right` using explicitly specified columns and an
887    /// optional filter expression.
888    ///
889    /// See [`join_on`](Self::join_on) for a more concise way to specify the
890    /// join condition. Since DataFusion will automatically identify and
891    /// optimize equality predicates there is no performance difference between
892    /// this function and `join_on`
893    ///
894    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
895    /// example below), which are then combined with the optional `filter`
896    /// expression.
897    ///
898    /// Note that in case of outer join, the `filter` is applied to only matched rows.
899    pub fn join(
900        self,
901        right: LogicalPlan,
902        join_type: JoinType,
903        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
904        filter: Option<Expr>,
905    ) -> Result<Self> {
906        self.join_detailed(
907            right,
908            join_type,
909            join_keys,
910            filter,
911            NullEquality::NullEqualsNothing,
912        )
913    }
914
915    /// Apply a join using the specified expressions.
916    ///
917    /// Note that DataFusion automatically optimizes joins, including
918    /// identifying and optimizing equality predicates.
919    ///
920    /// # Example
921    ///
922    /// ```
923    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
924    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
925    /// # use std::sync::Arc;
926    /// # use arrow::datatypes::{Schema, DataType, Field};
927    /// # use datafusion_common::Result;
928    /// # fn main() -> Result<()> {
929    /// let example_schema = Arc::new(Schema::new(vec![
930    ///     Field::new("a", DataType::Int32, false),
931    ///     Field::new("b", DataType::Int32, false),
932    ///     Field::new("c", DataType::Int32, false),
933    /// ]));
934    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
935    /// let left_table = table_source.clone();
936    /// let right_table = table_source.clone();
937    ///
938    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
939    ///
940    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
941    /// let exprs = vec![
942    ///     col("left.a").eq(col("right.a")),
943    ///     col("left.b").not_eq(col("right.b"))
944    ///  ];
945    ///
946    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
947    /// // finding all pairs of rows from `left` and `right` where
948    /// // where `a = a2` and `b != b2`.
949    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
950    ///     .join_on(right_plan, JoinType::Inner, exprs)?
951    ///     .build()?;
952    /// # Ok(())
953    /// # }
954    /// ```
955    pub fn join_on(
956        self,
957        right: LogicalPlan,
958        join_type: JoinType,
959        on_exprs: impl IntoIterator<Item = Expr>,
960    ) -> Result<Self> {
961        let filter = on_exprs.into_iter().reduce(Expr::and);
962
963        self.join_detailed(
964            right,
965            join_type,
966            (Vec::<Column>::new(), Vec::<Column>::new()),
967            filter,
968            NullEquality::NullEqualsNothing,
969        )
970    }
971
972    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
973        if column.relation.is_some() {
974            // column is already normalized
975            return Ok(column);
976        }
977
978        let schema = plan.schema();
979        let fallback_schemas = plan.fallback_normalize_schemas();
980        let using_columns = plan.using_columns()?;
981        column.normalize_with_schemas_and_ambiguity_check(
982            &[&[schema], &fallback_schemas],
983            &using_columns,
984        )
985    }
986
987    /// Apply a join with on constraint and specified null equality.
988    ///
989    /// The behavior is the same as [`join`](Self::join) except that it allows
990    /// specifying the null equality behavior.
991    ///
992    /// The `null_equality` dictates how `null` values are joined.
993    pub fn join_detailed(
994        self,
995        right: LogicalPlan,
996        join_type: JoinType,
997        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
998        filter: Option<Expr>,
999        null_equality: NullEquality,
1000    ) -> Result<Self> {
1001        if join_keys.0.len() != join_keys.1.len() {
1002            return plan_err!("left_keys and right_keys were not the same length");
1003        }
1004
1005        let filter = if let Some(expr) = filter {
1006            let filter = normalize_col_with_schemas_and_ambiguity_check(
1007                expr,
1008                &[&[self.schema(), right.schema()]],
1009                &[],
1010            )?;
1011            Some(filter)
1012        } else {
1013            None
1014        };
1015
1016        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1017            join_keys
1018                .0
1019                .into_iter()
1020                .zip(join_keys.1)
1021                .map(|(l, r)| {
1022                    let l = l.into();
1023                    let r = r.into();
1024
1025                    match (&l.relation, &r.relation) {
1026                        (Some(lr), Some(rr)) => {
1027                            let l_is_left =
1028                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1029                            let l_is_right =
1030                                right.schema().field_with_qualified_name(lr, &l.name);
1031                            let r_is_left =
1032                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1033                            let r_is_right =
1034                                right.schema().field_with_qualified_name(rr, &r.name);
1035
1036                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1037                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1038                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1039                                _ => (
1040                                    Self::normalize(&self.plan, l),
1041                                    Self::normalize(&right, r),
1042                                ),
1043                            }
1044                        }
1045                        (Some(lr), None) => {
1046                            let l_is_left =
1047                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1048                            let l_is_right =
1049                                right.schema().field_with_qualified_name(lr, &l.name);
1050
1051                            match (l_is_left, l_is_right) {
1052                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1053                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1054                                _ => (
1055                                    Self::normalize(&self.plan, l),
1056                                    Self::normalize(&right, r),
1057                                ),
1058                            }
1059                        }
1060                        (None, Some(rr)) => {
1061                            let r_is_left =
1062                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1063                            let r_is_right =
1064                                right.schema().field_with_qualified_name(rr, &r.name);
1065
1066                            match (r_is_left, r_is_right) {
1067                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1068                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1069                                _ => (
1070                                    Self::normalize(&self.plan, l),
1071                                    Self::normalize(&right, r),
1072                                ),
1073                            }
1074                        }
1075                        (None, None) => {
1076                            let mut swap = false;
1077                            let left_key = Self::normalize(&self.plan, l.clone())
1078                                .or_else(|_| {
1079                                    swap = true;
1080                                    Self::normalize(&right, l)
1081                                });
1082                            if swap {
1083                                (Self::normalize(&self.plan, r), left_key)
1084                            } else {
1085                                (left_key, Self::normalize(&right, r))
1086                            }
1087                        }
1088                    }
1089                })
1090                .unzip();
1091
1092        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1093        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1094
1095        let on: Vec<_> = left_keys
1096            .into_iter()
1097            .zip(right_keys)
1098            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1099            .collect();
1100        let join_schema =
1101            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1102
1103        // Inner type without join condition is cross join
1104        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1105            return plan_err!("join condition should not be empty");
1106        }
1107
1108        Ok(Self::new(LogicalPlan::Join(Join {
1109            left: self.plan,
1110            right: Arc::new(right),
1111            on,
1112            filter,
1113            join_type,
1114            join_constraint: JoinConstraint::On,
1115            schema: DFSchemaRef::new(join_schema),
1116            null_equality,
1117        })))
1118    }
1119
1120    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1121    pub fn join_using(
1122        self,
1123        right: LogicalPlan,
1124        join_type: JoinType,
1125        using_keys: Vec<Column>,
1126    ) -> Result<Self> {
1127        let left_keys: Vec<Column> = using_keys
1128            .clone()
1129            .into_iter()
1130            .map(|c| Self::normalize(&self.plan, c))
1131            .collect::<Result<_>>()?;
1132        let right_keys: Vec<Column> = using_keys
1133            .into_iter()
1134            .map(|c| Self::normalize(&right, c))
1135            .collect::<Result<_>>()?;
1136
1137        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1138        let mut join_on: Vec<(Expr, Expr)> = vec![];
1139        let mut filters: Option<Expr> = None;
1140        for (l, r) in &on {
1141            if self.plan.schema().has_column(l)
1142                && right.schema().has_column(r)
1143                && can_hash(
1144                    datafusion_common::ExprSchema::field_from_column(
1145                        self.plan.schema(),
1146                        l,
1147                    )?
1148                    .data_type(),
1149                )
1150            {
1151                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1152            } else if self.plan.schema().has_column(l)
1153                && right.schema().has_column(r)
1154                && can_hash(
1155                    datafusion_common::ExprSchema::field_from_column(
1156                        self.plan.schema(),
1157                        r,
1158                    )?
1159                    .data_type(),
1160                )
1161            {
1162                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1163            } else {
1164                let expr = binary_expr(
1165                    Expr::Column(l.clone()),
1166                    Operator::Eq,
1167                    Expr::Column(r.clone()),
1168                );
1169                match filters {
1170                    None => filters = Some(expr),
1171                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1172                }
1173            }
1174        }
1175
1176        if join_on.is_empty() {
1177            let join = Self::from(self.plan).cross_join(right)?;
1178            join.filter(filters.ok_or_else(|| {
1179                DataFusionError::Internal("filters should not be None here".to_string())
1180            })?)
1181        } else {
1182            let join = Join::try_new(
1183                self.plan,
1184                Arc::new(right),
1185                join_on,
1186                filters,
1187                join_type,
1188                JoinConstraint::Using,
1189                NullEquality::NullEqualsNothing,
1190            )?;
1191
1192            Ok(Self::new(LogicalPlan::Join(join)))
1193        }
1194    }
1195
1196    /// Apply a cross join
1197    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1198        let join = Join::try_new(
1199            self.plan,
1200            Arc::new(right),
1201            vec![],
1202            None,
1203            JoinType::Inner,
1204            JoinConstraint::On,
1205            NullEquality::NullEqualsNothing,
1206        )?;
1207
1208        Ok(Self::new(LogicalPlan::Join(join)))
1209    }
1210
1211    /// Repartition
1212    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1213        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1214            input: self.plan,
1215            partitioning_scheme,
1216        })))
1217    }
1218
1219    /// Apply a window functions to extend the schema
1220    pub fn window(
1221        self,
1222        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1223    ) -> Result<Self> {
1224        let window_expr = normalize_cols(window_expr, &self.plan)?;
1225        validate_unique_names("Windows", &window_expr)?;
1226        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1227            window_expr,
1228            self.plan,
1229        )?)))
1230    }
1231
1232    /// Apply an aggregate: grouping on the `group_expr` expressions
1233    /// and calculating `aggr_expr` aggregates for each distinct
1234    /// value of the `group_expr`;
1235    pub fn aggregate(
1236        self,
1237        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1238        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1239    ) -> Result<Self> {
1240        let group_expr = normalize_cols(group_expr, &self.plan)?;
1241        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1242
1243        let group_expr = if self.options.add_implicit_group_by_exprs {
1244            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1245        } else {
1246            group_expr
1247        };
1248
1249        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1250            .map(LogicalPlan::Aggregate)
1251            .map(Self::new)
1252    }
1253
1254    /// Create an expression to represent the explanation of the plan
1255    ///
1256    /// if `analyze` is true, runs the actual plan and produces
1257    /// information about metrics during run.
1258    ///
1259    /// if `verbose` is true, prints out additional details.
1260    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1261        // Keep the format default to Indent
1262        self.explain_option_format(
1263            ExplainOption::default()
1264                .with_verbose(verbose)
1265                .with_analyze(analyze),
1266        )
1267    }
1268
1269    /// Create an expression to represent the explanation of the plan
1270    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1271    /// Details see [`ExplainOption`].
1272    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1273        let schema = LogicalPlan::explain_schema();
1274        let schema = schema.to_dfschema_ref()?;
1275
1276        if explain_option.analyze {
1277            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1278                verbose: explain_option.verbose,
1279                input: self.plan,
1280                schema,
1281            })))
1282        } else {
1283            let stringified_plans =
1284                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1285
1286            Ok(Self::new(LogicalPlan::Explain(Explain {
1287                verbose: explain_option.verbose,
1288                plan: self.plan,
1289                explain_format: explain_option.format,
1290                stringified_plans,
1291                schema,
1292                logical_optimization_succeeded: false,
1293            })))
1294        }
1295    }
1296
1297    /// Process intersect set operator
1298    pub fn intersect(
1299        left_plan: LogicalPlan,
1300        right_plan: LogicalPlan,
1301        is_all: bool,
1302    ) -> Result<LogicalPlan> {
1303        LogicalPlanBuilder::intersect_or_except(
1304            left_plan,
1305            right_plan,
1306            JoinType::LeftSemi,
1307            is_all,
1308        )
1309    }
1310
1311    /// Process except set operator
1312    pub fn except(
1313        left_plan: LogicalPlan,
1314        right_plan: LogicalPlan,
1315        is_all: bool,
1316    ) -> Result<LogicalPlan> {
1317        LogicalPlanBuilder::intersect_or_except(
1318            left_plan,
1319            right_plan,
1320            JoinType::LeftAnti,
1321            is_all,
1322        )
1323    }
1324
1325    /// Process intersect or except
1326    fn intersect_or_except(
1327        left_plan: LogicalPlan,
1328        right_plan: LogicalPlan,
1329        join_type: JoinType,
1330        is_all: bool,
1331    ) -> Result<LogicalPlan> {
1332        let left_len = left_plan.schema().fields().len();
1333        let right_len = right_plan.schema().fields().len();
1334
1335        if left_len != right_len {
1336            return plan_err!(
1337                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1338            );
1339        }
1340
1341        let join_keys = left_plan
1342            .schema()
1343            .fields()
1344            .iter()
1345            .zip(right_plan.schema().fields().iter())
1346            .map(|(left_field, right_field)| {
1347                (
1348                    (Column::from_name(left_field.name())),
1349                    (Column::from_name(right_field.name())),
1350                )
1351            })
1352            .unzip();
1353        if is_all {
1354            LogicalPlanBuilder::from(left_plan)
1355                .join_detailed(
1356                    right_plan,
1357                    join_type,
1358                    join_keys,
1359                    None,
1360                    NullEquality::NullEqualsNull,
1361                )?
1362                .build()
1363        } else {
1364            LogicalPlanBuilder::from(left_plan)
1365                .distinct()?
1366                .join_detailed(
1367                    right_plan,
1368                    join_type,
1369                    join_keys,
1370                    None,
1371                    NullEquality::NullEqualsNull,
1372                )?
1373                .build()
1374        }
1375    }
1376
1377    /// Build the plan
1378    pub fn build(self) -> Result<LogicalPlan> {
1379        Ok(Arc::unwrap_or_clone(self.plan))
1380    }
1381
1382    /// Apply a join with both explicit equijoin and non equijoin predicates.
1383    ///
1384    /// Note this is a low level API that requires identifying specific
1385    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1386    /// automatically identifies predicates appropriately.
1387    ///
1388    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1389    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1390    /// to columns from the existing input. `r`, the second element of the tuple,
1391    /// must only refer to columns from the right input.
1392    ///
1393    /// `filter` contains any other filter expression to apply during the
1394    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1395    /// than the filter expressions, so they are preferred.
1396    pub fn join_with_expr_keys(
1397        self,
1398        right: LogicalPlan,
1399        join_type: JoinType,
1400        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1401        filter: Option<Expr>,
1402    ) -> Result<Self> {
1403        if equi_exprs.0.len() != equi_exprs.1.len() {
1404            return plan_err!("left_keys and right_keys were not the same length");
1405        }
1406
1407        let join_key_pairs = equi_exprs
1408            .0
1409            .into_iter()
1410            .zip(equi_exprs.1)
1411            .map(|(l, r)| {
1412                let left_key = l.into();
1413                let right_key = r.into();
1414                let mut left_using_columns  = HashSet::new();
1415                expr_to_columns(&left_key, &mut left_using_columns)?;
1416                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1417                    left_key,
1418                    &[&[self.plan.schema()]],
1419                    &[],
1420                )?;
1421
1422                let mut right_using_columns = HashSet::new();
1423                expr_to_columns(&right_key, &mut right_using_columns)?;
1424                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1425                    right_key,
1426                    &[&[right.schema()]],
1427                    &[],
1428                )?;
1429
1430                // find valid equijoin
1431                find_valid_equijoin_key_pair(
1432                        &normalized_left_key,
1433                        &normalized_right_key,
1434                        self.plan.schema(),
1435                        right.schema(),
1436                    )?.ok_or_else(||
1437                        plan_datafusion_err!(
1438                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1439                        ))
1440            })
1441            .collect::<Result<Vec<_>>>()?;
1442
1443        let join = Join::try_new(
1444            self.plan,
1445            Arc::new(right),
1446            join_key_pairs,
1447            filter,
1448            join_type,
1449            JoinConstraint::On,
1450            NullEquality::NullEqualsNothing,
1451        )?;
1452
1453        Ok(Self::new(LogicalPlan::Join(join)))
1454    }
1455
1456    /// Unnest the given column.
1457    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1458        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1459    }
1460
1461    /// Unnest the given column given [`UnnestOptions`]
1462    pub fn unnest_column_with_options(
1463        self,
1464        column: impl Into<Column>,
1465        options: UnnestOptions,
1466    ) -> Result<Self> {
1467        unnest_with_options(
1468            Arc::unwrap_or_clone(self.plan),
1469            vec![column.into()],
1470            options,
1471        )
1472        .map(Self::new)
1473    }
1474
1475    /// Unnest the given columns with the given [`UnnestOptions`]
1476    pub fn unnest_columns_with_options(
1477        self,
1478        columns: Vec<Column>,
1479        options: UnnestOptions,
1480    ) -> Result<Self> {
1481        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1482            .map(Self::new)
1483    }
1484}
1485
1486impl From<LogicalPlan> for LogicalPlanBuilder {
1487    fn from(plan: LogicalPlan) -> Self {
1488        LogicalPlanBuilder::new(plan)
1489    }
1490}
1491
1492impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1493    fn from(plan: Arc<LogicalPlan>) -> Self {
1494        LogicalPlanBuilder::new_from_arc(plan)
1495    }
1496}
1497
1498/// Container used when building fields for a `VALUES` node.
1499#[derive(Default)]
1500struct ValuesFields {
1501    inner: Vec<Field>,
1502}
1503
1504impl ValuesFields {
1505    pub fn new() -> Self {
1506        Self::default()
1507    }
1508
1509    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1510        // Naming follows the convention described here:
1511        // https://www.postgresql.org/docs/current/queries-values.html
1512        let name = format!("column{}", self.inner.len() + 1);
1513        self.inner.push(Field::new(name, data_type, nullable));
1514    }
1515
1516    pub fn into_fields(self) -> Fields {
1517        self.inner.into()
1518    }
1519}
1520
1521/// Returns aliases to make field names unique.
1522///
1523/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1524/// `Some(alias)` means rename to the alias to ensure uniqueness.
1525///
1526/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1527/// to maintain unique column names.
1528///
1529/// # Example
1530/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1531/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1532pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1533    // Some field names might already come to this function with the count (number of times it appeared)
1534    // as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1535    // if these three fields passed to this function: "col:1", "col" and "col", the function
1536    // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1537    // That's why we need the `seen` set, so the fields are always unique.
1538
1539    // Tracks a mapping between a field name and the number of appearances of that field.
1540    let mut name_map = HashMap::<&str, usize>::new();
1541    // Tracks all the fields and aliases that were previously seen.
1542    let mut seen = HashSet::<Cow<String>>::new();
1543
1544    fields
1545        .iter()
1546        .map(|field| {
1547            let original_name = field.name();
1548            let mut name = Cow::Borrowed(original_name);
1549
1550            let count = name_map.entry(original_name).or_insert(0);
1551
1552            // Loop until we find a name that hasn't been used.
1553            while seen.contains(&name) {
1554                *count += 1;
1555                name = Cow::Owned(format!("{original_name}:{count}"));
1556            }
1557
1558            seen.insert(name.clone());
1559
1560            match name {
1561                Cow::Borrowed(_) => None,
1562                Cow::Owned(alias) => Some(alias),
1563            }
1564        })
1565        .collect()
1566}
1567
1568fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1569    let mut table_references = schema
1570        .iter()
1571        .filter_map(|(qualifier, _)| qualifier)
1572        .collect::<Vec<_>>();
1573    table_references.dedup();
1574    let table_reference = if table_references.len() == 1 {
1575        table_references.pop().cloned()
1576    } else {
1577        None
1578    };
1579
1580    (
1581        table_reference,
1582        Arc::new(Field::new("mark", DataType::Boolean, false)),
1583    )
1584}
1585
1586/// Creates a schema for a join operation.
1587/// The fields from the left side are first
1588pub fn build_join_schema(
1589    left: &DFSchema,
1590    right: &DFSchema,
1591    join_type: &JoinType,
1592) -> Result<DFSchema> {
1593    fn nullify_fields<'a>(
1594        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1595    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1596        fields
1597            .map(|(q, f)| {
1598                // TODO: find a good way to do that
1599                let field = f.as_ref().clone().with_nullable(true);
1600                (q.cloned(), Arc::new(field))
1601            })
1602            .collect()
1603    }
1604
1605    let right_fields = right.iter();
1606    let left_fields = left.iter();
1607
1608    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1609        JoinType::Inner => {
1610            // left then right
1611            let left_fields = left_fields
1612                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1613                .collect::<Vec<_>>();
1614            let right_fields = right_fields
1615                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1616                .collect::<Vec<_>>();
1617            left_fields.into_iter().chain(right_fields).collect()
1618        }
1619        JoinType::Left => {
1620            // left then right, right set to nullable in case of not matched scenario
1621            let left_fields = left_fields
1622                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1623                .collect::<Vec<_>>();
1624            left_fields
1625                .into_iter()
1626                .chain(nullify_fields(right_fields))
1627                .collect()
1628        }
1629        JoinType::Right => {
1630            // left then right, left set to nullable in case of not matched scenario
1631            let right_fields = right_fields
1632                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1633                .collect::<Vec<_>>();
1634            nullify_fields(left_fields)
1635                .into_iter()
1636                .chain(right_fields)
1637                .collect()
1638        }
1639        JoinType::Full => {
1640            // left then right, all set to nullable in case of not matched scenario
1641            nullify_fields(left_fields)
1642                .into_iter()
1643                .chain(nullify_fields(right_fields))
1644                .collect()
1645        }
1646        JoinType::LeftSemi | JoinType::LeftAnti => {
1647            // Only use the left side for the schema
1648            left_fields
1649                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1650                .collect()
1651        }
1652        JoinType::LeftMark => left_fields
1653            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1654            .chain(once(mark_field(right)))
1655            .collect(),
1656        JoinType::RightSemi | JoinType::RightAnti => {
1657            // Only use the right side for the schema
1658            right_fields
1659                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1660                .collect()
1661        }
1662        JoinType::RightMark => right_fields
1663            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1664            .chain(once(mark_field(left)))
1665            .collect(),
1666    };
1667    let func_dependencies = left.functional_dependencies().join(
1668        right.functional_dependencies(),
1669        join_type,
1670        left.fields().len(),
1671    );
1672
1673    let (schema1, schema2) = match join_type {
1674        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1675        _ => (right, left),
1676    };
1677
1678    let metadata = schema1
1679        .metadata()
1680        .clone()
1681        .into_iter()
1682        .chain(schema2.metadata().clone())
1683        .collect();
1684
1685    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1686    dfschema.with_functional_dependencies(func_dependencies)
1687}
1688
1689/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1690/// conflict with the columns from the other.
1691/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1692/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1693/// places (see e.g. DFSchema::check_names).
1694/// The function returns:
1695/// - The requalified or original left logical plan
1696/// - The requalified or original right logical plan
1697/// - If a requalification was needed or not
1698pub fn requalify_sides_if_needed(
1699    left: LogicalPlanBuilder,
1700    right: LogicalPlanBuilder,
1701) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1702    let left_cols = left.schema().columns();
1703    let right_cols = right.schema().columns();
1704    if left_cols.iter().any(|l| {
1705        right_cols.iter().any(|r| {
1706            l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1707        })
1708    }) {
1709        // These names have no connection to the original plan, but they'll make the columns
1710        // (mostly) unique.
1711        Ok((
1712            left.alias(TableReference::bare("left"))?,
1713            right.alias(TableReference::bare("right"))?,
1714            true,
1715        ))
1716    } else {
1717        Ok((left, right, false))
1718    }
1719}
1720
1721/// Add additional "synthetic" group by expressions based on functional
1722/// dependencies.
1723///
1724/// For example, if we are grouping on `[c1]`, and we know from
1725/// functional dependencies that column `c1` determines `c2`, this function
1726/// adds `c2` to the group by list.
1727///
1728/// This allows MySQL style selects like
1729/// `SELECT col FROM t WHERE pk = 5` if col is unique
1730pub fn add_group_by_exprs_from_dependencies(
1731    mut group_expr: Vec<Expr>,
1732    schema: &DFSchemaRef,
1733) -> Result<Vec<Expr>> {
1734    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1735    // c1 + 1` produces an output field named `"c1 + 1"`
1736    let mut group_by_field_names = group_expr
1737        .iter()
1738        .map(|e| e.schema_name().to_string())
1739        .collect::<Vec<_>>();
1740
1741    if let Some(target_indices) =
1742        get_target_functional_dependencies(schema, &group_by_field_names)
1743    {
1744        for idx in target_indices {
1745            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1746            let expr_name = expr.schema_name().to_string();
1747            if !group_by_field_names.contains(&expr_name) {
1748                group_by_field_names.push(expr_name);
1749                group_expr.push(expr);
1750            }
1751        }
1752    }
1753    Ok(group_expr)
1754}
1755
1756/// Errors if one or more expressions have equal names.
1757pub fn validate_unique_names<'a>(
1758    node_name: &str,
1759    expressions: impl IntoIterator<Item = &'a Expr>,
1760) -> Result<()> {
1761    let mut unique_names = HashMap::new();
1762
1763    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1764        let name = expr.schema_name().to_string();
1765        match unique_names.get(&name) {
1766            None => {
1767                unique_names.insert(name, (position, expr));
1768                Ok(())
1769            },
1770            Some((existing_position, existing_expr)) => {
1771                plan_err!("{node_name} require unique expression names \
1772                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1773                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1774                            )
1775            }
1776        }
1777    })
1778}
1779
1780/// Union two [`LogicalPlan`]s.
1781///
1782/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1783/// subtree expressions will not be properly typed until the optimizer pass.
1784///
1785/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1786/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1787/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1788///
1789/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1790/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1791pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1792    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1793        Arc::new(left_plan),
1794        Arc::new(right_plan),
1795    ])?))
1796}
1797
1798/// Like [`union`], but combine rows from different tables by name, rather than
1799/// by position.
1800pub fn union_by_name(
1801    left_plan: LogicalPlan,
1802    right_plan: LogicalPlan,
1803) -> Result<LogicalPlan> {
1804    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1805        Arc::new(left_plan),
1806        Arc::new(right_plan),
1807    ])?))
1808}
1809
1810/// Create Projection
1811/// # Errors
1812/// This function errors under any of the following conditions:
1813/// * Two or more expressions have the same name
1814/// * An invalid expression is used (e.g. a `sort` expression)
1815pub fn project(
1816    plan: LogicalPlan,
1817    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1818) -> Result<LogicalPlan> {
1819    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1820}
1821
1822/// Create Projection. Similar to project except that the expressions
1823/// passed in have a flag to indicate if that expression requires
1824/// validation (normalize & columnize) (true) or not (false)
1825/// # Errors
1826/// This function errors under any of the following conditions:
1827/// * Two or more expressions have the same name
1828/// * An invalid expression is used (e.g. a `sort` expression)
1829fn project_with_validation(
1830    plan: LogicalPlan,
1831    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1832) -> Result<LogicalPlan> {
1833    let mut projected_expr = vec![];
1834    for (e, validate) in expr {
1835        let e = e.into();
1836        match e {
1837            SelectExpr::Wildcard(opt) => {
1838                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1839
1840                // If there is a REPLACE statement, replace that column with the given
1841                // replace expression. Column name remains the same.
1842                let expanded = if let Some(replace) = opt.replace {
1843                    replace_columns(expanded, &replace)?
1844                } else {
1845                    expanded
1846                };
1847
1848                for e in expanded {
1849                    if validate {
1850                        projected_expr
1851                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1852                    } else {
1853                        projected_expr.push(e)
1854                    }
1855                }
1856            }
1857            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1858                let expanded =
1859                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1860
1861                // If there is a REPLACE statement, replace that column with the given
1862                // replace expression. Column name remains the same.
1863                let expanded = if let Some(replace) = opt.replace {
1864                    replace_columns(expanded, &replace)?
1865                } else {
1866                    expanded
1867                };
1868
1869                for e in expanded {
1870                    if validate {
1871                        projected_expr
1872                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1873                    } else {
1874                        projected_expr.push(e)
1875                    }
1876                }
1877            }
1878            SelectExpr::Expression(e) => {
1879                if validate {
1880                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1881                } else {
1882                    projected_expr.push(e)
1883                }
1884            }
1885        }
1886    }
1887    validate_unique_names("Projections", projected_expr.iter())?;
1888
1889    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1890}
1891
1892/// If there is a REPLACE statement in the projected expression in the form of
1893/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1894/// that column with the given replace expression. Column name remains the same.
1895/// Multiple REPLACEs are also possible with comma separations.
1896fn replace_columns(
1897    mut exprs: Vec<Expr>,
1898    replace: &PlannedReplaceSelectItem,
1899) -> Result<Vec<Expr>> {
1900    for expr in exprs.iter_mut() {
1901        if let Expr::Column(Column { name, .. }) = expr {
1902            if let Some((_, new_expr)) = replace
1903                .items()
1904                .iter()
1905                .zip(replace.expressions().iter())
1906                .find(|(item, _)| item.column_name.value == *name)
1907            {
1908                *expr = new_expr.clone().alias(name.clone())
1909            }
1910        }
1911    }
1912    Ok(exprs)
1913}
1914
1915/// Create a SubqueryAlias to wrap a LogicalPlan.
1916pub fn subquery_alias(
1917    plan: LogicalPlan,
1918    alias: impl Into<TableReference>,
1919) -> Result<LogicalPlan> {
1920    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1921}
1922
1923/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1924/// This is mostly used for testing and documentation.
1925pub fn table_scan(
1926    name: Option<impl Into<TableReference>>,
1927    table_schema: &Schema,
1928    projection: Option<Vec<usize>>,
1929) -> Result<LogicalPlanBuilder> {
1930    table_scan_with_filters(name, table_schema, projection, vec![])
1931}
1932
1933/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1934/// and inlined filters.
1935/// This is mostly used for testing and documentation.
1936pub fn table_scan_with_filters(
1937    name: Option<impl Into<TableReference>>,
1938    table_schema: &Schema,
1939    projection: Option<Vec<usize>>,
1940    filters: Vec<Expr>,
1941) -> Result<LogicalPlanBuilder> {
1942    let table_source = table_source(table_schema);
1943    let name = name
1944        .map(|n| n.into())
1945        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1946    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1947}
1948
1949/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1950/// filters, and inlined fetch.
1951/// This is mostly used for testing and documentation.
1952pub fn table_scan_with_filter_and_fetch(
1953    name: Option<impl Into<TableReference>>,
1954    table_schema: &Schema,
1955    projection: Option<Vec<usize>>,
1956    filters: Vec<Expr>,
1957    fetch: Option<usize>,
1958) -> Result<LogicalPlanBuilder> {
1959    let table_source = table_source(table_schema);
1960    let name = name
1961        .map(|n| n.into())
1962        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1963    LogicalPlanBuilder::scan_with_filters_fetch(
1964        name,
1965        table_source,
1966        projection,
1967        filters,
1968        fetch,
1969    )
1970}
1971
1972pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1973    let table_schema = Arc::new(table_schema.clone());
1974    Arc::new(LogicalTableSource {
1975        table_schema,
1976        constraints: Default::default(),
1977    })
1978}
1979
1980pub fn table_source_with_constraints(
1981    table_schema: &Schema,
1982    constraints: Constraints,
1983) -> Arc<dyn TableSource> {
1984    let table_schema = Arc::new(table_schema.clone());
1985    Arc::new(LogicalTableSource {
1986        table_schema,
1987        constraints,
1988    })
1989}
1990
1991/// Wrap projection for a plan, if the join keys contains normal expression.
1992pub fn wrap_projection_for_join_if_necessary(
1993    join_keys: &[Expr],
1994    input: LogicalPlan,
1995) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1996    let input_schema = input.schema();
1997    let alias_join_keys: Vec<Expr> = join_keys
1998        .iter()
1999        .map(|key| {
2000            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
2001            // If we do not add alias, it will throw same field name error in the schema when adding projection.
2002            // For example:
2003            //    input scan : [a, b, c],
2004            //    join keys: [cast(a as int)]
2005            //
2006            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
2007            //  https://github.com/apache/datafusion/issues/4478
2008            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2009                let alias = format!("{key}");
2010                key.clone().alias(alias)
2011            } else {
2012                key.clone()
2013            }
2014        })
2015        .collect::<Vec<_>>();
2016
2017    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2018    let plan = if need_project {
2019        // Include all columns from the input and extend them with the join keys
2020        let mut projection = input_schema
2021            .columns()
2022            .into_iter()
2023            .map(Expr::Column)
2024            .collect::<Vec<_>>();
2025        let join_key_items = alias_join_keys
2026            .iter()
2027            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2028            .cloned()
2029            .collect::<HashSet<Expr>>();
2030        projection.extend(join_key_items);
2031
2032        LogicalPlanBuilder::from(input)
2033            .project(projection.into_iter().map(SelectExpr::from))?
2034            .build()?
2035    } else {
2036        input
2037    };
2038
2039    let join_on = alias_join_keys
2040        .into_iter()
2041        .map(|key| {
2042            if let Some(col) = key.try_as_col() {
2043                Ok(col.clone())
2044            } else {
2045                let name = key.schema_name().to_string();
2046                Ok(Column::from_name(name))
2047            }
2048        })
2049        .collect::<Result<Vec<_>>>()?;
2050
2051    Ok((plan, join_on, need_project))
2052}
2053
2054/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2055/// that users will provide their own TableSource implementations or use DataFusion's
2056/// DefaultTableSource.
2057pub struct LogicalTableSource {
2058    table_schema: SchemaRef,
2059    constraints: Constraints,
2060}
2061
2062impl LogicalTableSource {
2063    /// Create a new LogicalTableSource
2064    pub fn new(table_schema: SchemaRef) -> Self {
2065        Self {
2066            table_schema,
2067            constraints: Constraints::default(),
2068        }
2069    }
2070
2071    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2072        self.constraints = constraints;
2073        self
2074    }
2075}
2076
2077impl TableSource for LogicalTableSource {
2078    fn as_any(&self) -> &dyn Any {
2079        self
2080    }
2081
2082    fn schema(&self) -> SchemaRef {
2083        Arc::clone(&self.table_schema)
2084    }
2085
2086    fn constraints(&self) -> Option<&Constraints> {
2087        Some(&self.constraints)
2088    }
2089
2090    fn supports_filters_pushdown(
2091        &self,
2092        filters: &[&Expr],
2093    ) -> Result<Vec<TableProviderFilterPushDown>> {
2094        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2095    }
2096}
2097
2098/// Create a [`LogicalPlan::Unnest`] plan
2099pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2100    unnest_with_options(input, columns, UnnestOptions::default())
2101}
2102
2103pub fn get_struct_unnested_columns(
2104    col_name: &String,
2105    inner_fields: &Fields,
2106) -> Vec<Column> {
2107    inner_fields
2108        .iter()
2109        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2110        .collect()
2111}
2112
2113/// Create a [`LogicalPlan::Unnest`] plan with options
2114/// This function receive a list of columns to be unnested
2115/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2116/// The new schema will contains post-unnest fields replacing the original field
2117///
2118/// For example:
2119/// Input schema as
2120/// ```text
2121/// +---------------------+-------------------+
2122/// | col1                | col2              |
2123/// +---------------------+-------------------+
2124/// | Struct(INT64,INT32) | List(List(Int64)) |
2125/// +---------------------+-------------------+
2126/// ```
2127///
2128///
2129///
2130/// Then unnesting columns with:
2131/// - (col1,Struct)
2132/// - (col2,List(\[depth=1,depth=2\]))
2133///
2134/// will generate a new schema as
2135/// ```text
2136/// +---------+---------+---------------------+---------------------+
2137/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2138/// +---------+---------+---------------------+---------------------+
2139/// | Int64   | Int32   | List(Int64)         |  Int64              |
2140/// +---------+---------+---------------------+---------------------+
2141/// ```
2142pub fn unnest_with_options(
2143    input: LogicalPlan,
2144    columns_to_unnest: Vec<Column>,
2145    options: UnnestOptions,
2146) -> Result<LogicalPlan> {
2147    Ok(LogicalPlan::Unnest(Unnest::try_new(
2148        Arc::new(input),
2149        columns_to_unnest,
2150        options,
2151    )?))
2152}
2153
2154#[cfg(test)]
2155mod tests {
2156    use super::*;
2157    use crate::logical_plan::StringifiedPlan;
2158    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2159
2160    use crate::test::function_stub::sum;
2161    use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2162    use insta::assert_snapshot;
2163
2164    #[test]
2165    fn plan_builder_simple() -> Result<()> {
2166        let plan =
2167            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2168                .filter(col("state").eq(lit("CO")))?
2169                .project(vec![col("id")])?
2170                .build()?;
2171
2172        assert_snapshot!(plan, @r#"
2173        Projection: employee_csv.id
2174          Filter: employee_csv.state = Utf8("CO")
2175            TableScan: employee_csv projection=[id, state]
2176        "#);
2177
2178        Ok(())
2179    }
2180
2181    #[test]
2182    fn plan_builder_schema() {
2183        let schema = employee_schema();
2184        let projection = None;
2185        let plan =
2186            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2187                .unwrap();
2188        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2189
2190        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2191        // (and thus normalized to "employee"csv") as well
2192        let projection = None;
2193        let plan =
2194            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2195                .unwrap();
2196        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2197    }
2198
2199    #[test]
2200    fn plan_builder_empty_name() {
2201        let schema = employee_schema();
2202        let projection = None;
2203        let err =
2204            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2205        assert_snapshot!(
2206            err.strip_backtrace(),
2207            @"Error during planning: table_name cannot be empty"
2208        );
2209    }
2210
2211    #[test]
2212    fn plan_builder_sort() -> Result<()> {
2213        let plan =
2214            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2215                .sort(vec![
2216                    expr::Sort::new(col("state"), true, true),
2217                    expr::Sort::new(col("salary"), false, false),
2218                ])?
2219                .build()?;
2220
2221        assert_snapshot!(plan, @r"
2222        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2223          TableScan: employee_csv projection=[state, salary]
2224        ");
2225
2226        Ok(())
2227    }
2228
2229    #[test]
2230    fn plan_builder_union() -> Result<()> {
2231        let plan =
2232            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2233
2234        let plan = plan
2235            .clone()
2236            .union(plan.clone().build()?)?
2237            .union(plan.clone().build()?)?
2238            .union(plan.build()?)?
2239            .build()?;
2240
2241        assert_snapshot!(plan, @r"
2242        Union
2243          Union
2244            Union
2245              TableScan: employee_csv projection=[state, salary]
2246              TableScan: employee_csv projection=[state, salary]
2247            TableScan: employee_csv projection=[state, salary]
2248          TableScan: employee_csv projection=[state, salary]
2249        ");
2250
2251        Ok(())
2252    }
2253
2254    #[test]
2255    fn plan_builder_union_distinct() -> Result<()> {
2256        let plan =
2257            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2258
2259        let plan = plan
2260            .clone()
2261            .union_distinct(plan.clone().build()?)?
2262            .union_distinct(plan.clone().build()?)?
2263            .union_distinct(plan.build()?)?
2264            .build()?;
2265
2266        assert_snapshot!(plan, @r"
2267        Distinct:
2268          Union
2269            Distinct:
2270              Union
2271                Distinct:
2272                  Union
2273                    TableScan: employee_csv projection=[state, salary]
2274                    TableScan: employee_csv projection=[state, salary]
2275                TableScan: employee_csv projection=[state, salary]
2276            TableScan: employee_csv projection=[state, salary]
2277        ");
2278
2279        Ok(())
2280    }
2281
2282    #[test]
2283    fn plan_builder_simple_distinct() -> Result<()> {
2284        let plan =
2285            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2286                .filter(col("state").eq(lit("CO")))?
2287                .project(vec![col("id")])?
2288                .distinct()?
2289                .build()?;
2290
2291        assert_snapshot!(plan, @r#"
2292        Distinct:
2293          Projection: employee_csv.id
2294            Filter: employee_csv.state = Utf8("CO")
2295              TableScan: employee_csv projection=[id, state]
2296        "#);
2297
2298        Ok(())
2299    }
2300
2301    #[test]
2302    fn exists_subquery() -> Result<()> {
2303        let foo = test_table_scan_with_name("foo")?;
2304        let bar = test_table_scan_with_name("bar")?;
2305
2306        let subquery = LogicalPlanBuilder::from(foo)
2307            .project(vec![col("a")])?
2308            .filter(col("a").eq(col("bar.a")))?
2309            .build()?;
2310
2311        let outer_query = LogicalPlanBuilder::from(bar)
2312            .project(vec![col("a")])?
2313            .filter(exists(Arc::new(subquery)))?
2314            .build()?;
2315
2316        assert_snapshot!(outer_query, @r"
2317        Filter: EXISTS (<subquery>)
2318          Subquery:
2319            Filter: foo.a = bar.a
2320              Projection: foo.a
2321                TableScan: foo
2322          Projection: bar.a
2323            TableScan: bar
2324        ");
2325
2326        Ok(())
2327    }
2328
2329    #[test]
2330    fn filter_in_subquery() -> Result<()> {
2331        let foo = test_table_scan_with_name("foo")?;
2332        let bar = test_table_scan_with_name("bar")?;
2333
2334        let subquery = LogicalPlanBuilder::from(foo)
2335            .project(vec![col("a")])?
2336            .filter(col("a").eq(col("bar.a")))?
2337            .build()?;
2338
2339        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2340        let outer_query = LogicalPlanBuilder::from(bar)
2341            .project(vec![col("a")])?
2342            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2343            .build()?;
2344
2345        assert_snapshot!(outer_query, @r"
2346        Filter: bar.a IN (<subquery>)
2347          Subquery:
2348            Filter: foo.a = bar.a
2349              Projection: foo.a
2350                TableScan: foo
2351          Projection: bar.a
2352            TableScan: bar
2353        ");
2354
2355        Ok(())
2356    }
2357
2358    #[test]
2359    fn select_scalar_subquery() -> Result<()> {
2360        let foo = test_table_scan_with_name("foo")?;
2361        let bar = test_table_scan_with_name("bar")?;
2362
2363        let subquery = LogicalPlanBuilder::from(foo)
2364            .project(vec![col("b")])?
2365            .filter(col("a").eq(col("bar.a")))?
2366            .build()?;
2367
2368        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2369        let outer_query = LogicalPlanBuilder::from(bar)
2370            .project(vec![scalar_subquery(Arc::new(subquery))])?
2371            .build()?;
2372
2373        assert_snapshot!(outer_query, @r"
2374        Projection: (<subquery>)
2375          Subquery:
2376            Filter: foo.a = bar.a
2377              Projection: foo.b
2378                TableScan: foo
2379          TableScan: bar
2380        ");
2381
2382        Ok(())
2383    }
2384
2385    #[test]
2386    fn projection_non_unique_names() -> Result<()> {
2387        let plan = table_scan(
2388            Some("employee_csv"),
2389            &employee_schema(),
2390            // project id and first_name by column index
2391            Some(vec![0, 1]),
2392        )?
2393        // two columns with the same name => error
2394        .project(vec![col("id"), col("first_name").alias("id")]);
2395
2396        match plan {
2397            Err(DataFusionError::SchemaError(err, _)) => {
2398                if let SchemaError::AmbiguousReference { field } = *err {
2399                    let Column {
2400                        relation,
2401                        name,
2402                        spans: _,
2403                    } = *field;
2404                    let Some(TableReference::Bare { table }) = relation else {
2405                        return plan_err!(
2406                            "wrong relation: {relation:?}, expected table name"
2407                        );
2408                    };
2409                    assert_eq!(*"employee_csv", *table);
2410                    assert_eq!("id", &name);
2411                    Ok(())
2412                } else {
2413                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2414                }
2415            }
2416            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2417        }
2418    }
2419
2420    fn employee_schema() -> Schema {
2421        Schema::new(vec![
2422            Field::new("id", DataType::Int32, false),
2423            Field::new("first_name", DataType::Utf8, false),
2424            Field::new("last_name", DataType::Utf8, false),
2425            Field::new("state", DataType::Utf8, false),
2426            Field::new("salary", DataType::Int32, false),
2427        ])
2428    }
2429
2430    #[test]
2431    fn stringified_plan() {
2432        let stringified_plan =
2433            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2434        assert!(stringified_plan.should_display(true));
2435        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2436
2437        let stringified_plan =
2438            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2439        assert!(stringified_plan.should_display(true));
2440        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2441
2442        let stringified_plan =
2443            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2444        assert!(stringified_plan.should_display(true));
2445        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2446
2447        let stringified_plan =
2448            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2449        assert!(stringified_plan.should_display(true));
2450        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2451
2452        let stringified_plan = StringifiedPlan::new(
2453            PlanType::OptimizedLogicalPlan {
2454                optimizer_name: "random opt pass".into(),
2455            },
2456            "...the plan...",
2457        );
2458        assert!(stringified_plan.should_display(true));
2459        assert!(!stringified_plan.should_display(false));
2460    }
2461
2462    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2463        let schema = Schema::new(vec![
2464            Field::new("a", DataType::UInt32, false),
2465            Field::new("b", DataType::UInt32, false),
2466            Field::new("c", DataType::UInt32, false),
2467        ]);
2468        table_scan(Some(name), &schema, None)?.build()
2469    }
2470
2471    #[test]
2472    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2473        let plan1 =
2474            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2475        let plan2 =
2476            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2477
2478        let err_msg1 =
2479            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2480                .unwrap_err();
2481
2482        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2483
2484        Ok(())
2485    }
2486
2487    #[test]
2488    fn plan_builder_unnest() -> Result<()> {
2489        // Cannot unnest on a scalar column
2490        let err = nested_table_scan("test_table")?
2491            .unnest_column("scalar")
2492            .unwrap_err();
2493
2494        let DataFusionError::Internal(desc) = err else {
2495            return plan_err!("Plan should have returned an DataFusionError::Internal");
2496        };
2497
2498        let desc = desc
2499            .split(DataFusionError::BACK_TRACE_SEP)
2500            .collect::<Vec<&str>>()
2501            .first()
2502            .unwrap_or(&"")
2503            .to_string();
2504
2505        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2506
2507        // Unnesting the strings list.
2508        let plan = nested_table_scan("test_table")?
2509            .unnest_column("strings")?
2510            .build()?;
2511
2512        assert_snapshot!(plan, @r"
2513        Unnest: lists[test_table.strings|depth=1] structs[]
2514          TableScan: test_table
2515        ");
2516
2517        // Check unnested field is a scalar
2518        let field = plan.schema().field_with_name(None, "strings").unwrap();
2519        assert_eq!(&DataType::Utf8, field.data_type());
2520
2521        // Unnesting the singular struct column result into 2 new columns for each subfield
2522        let plan = nested_table_scan("test_table")?
2523            .unnest_column("struct_singular")?
2524            .build()?;
2525
2526        assert_snapshot!(plan, @r"
2527        Unnest: lists[] structs[test_table.struct_singular]
2528          TableScan: test_table
2529        ");
2530
2531        for field_name in &["a", "b"] {
2532            // Check unnested struct field is a scalar
2533            let field = plan
2534                .schema()
2535                .field_with_name(None, &format!("struct_singular.{field_name}"))
2536                .unwrap();
2537            assert_eq!(&DataType::UInt32, field.data_type());
2538        }
2539
2540        // Unnesting multiple fields in separate plans
2541        let plan = nested_table_scan("test_table")?
2542            .unnest_column("strings")?
2543            .unnest_column("structs")?
2544            .unnest_column("struct_singular")?
2545            .build()?;
2546
2547        assert_snapshot!(plan, @r"
2548        Unnest: lists[] structs[test_table.struct_singular]
2549          Unnest: lists[test_table.structs|depth=1] structs[]
2550            Unnest: lists[test_table.strings|depth=1] structs[]
2551              TableScan: test_table
2552        ");
2553
2554        // Check unnested struct list field should be a struct.
2555        let field = plan.schema().field_with_name(None, "structs").unwrap();
2556        assert!(matches!(field.data_type(), DataType::Struct(_)));
2557
2558        // Unnesting multiple fields at the same time, using infer syntax
2559        let cols = vec!["strings", "structs", "struct_singular"]
2560            .into_iter()
2561            .map(|c| c.into())
2562            .collect();
2563
2564        let plan = nested_table_scan("test_table")?
2565            .unnest_columns_with_options(cols, UnnestOptions::default())?
2566            .build()?;
2567
2568        assert_snapshot!(plan, @r"
2569        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2570          TableScan: test_table
2571        ");
2572
2573        // Unnesting missing column should fail.
2574        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2575        assert!(plan.is_err());
2576
2577        // Simultaneously unnesting a list (with different depth) and a struct column
2578        let plan = nested_table_scan("test_table")?
2579            .unnest_columns_with_options(
2580                vec!["stringss".into(), "struct_singular".into()],
2581                UnnestOptions::default()
2582                    .with_recursions(RecursionUnnestOption {
2583                        input_column: "stringss".into(),
2584                        output_column: "stringss_depth_1".into(),
2585                        depth: 1,
2586                    })
2587                    .with_recursions(RecursionUnnestOption {
2588                        input_column: "stringss".into(),
2589                        output_column: "stringss_depth_2".into(),
2590                        depth: 2,
2591                    }),
2592            )?
2593            .build()?;
2594
2595        assert_snapshot!(plan, @r"
2596        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2597          TableScan: test_table
2598        ");
2599
2600        // Check output columns has correct type
2601        let field = plan
2602            .schema()
2603            .field_with_name(None, "stringss_depth_1")
2604            .unwrap();
2605        assert_eq!(
2606            &DataType::new_list(DataType::Utf8, false),
2607            field.data_type()
2608        );
2609        let field = plan
2610            .schema()
2611            .field_with_name(None, "stringss_depth_2")
2612            .unwrap();
2613        assert_eq!(&DataType::Utf8, field.data_type());
2614        // unnesting struct is still correct
2615        for field_name in &["a", "b"] {
2616            let field = plan
2617                .schema()
2618                .field_with_name(None, &format!("struct_singular.{field_name}"))
2619                .unwrap();
2620            assert_eq!(&DataType::UInt32, field.data_type());
2621        }
2622
2623        Ok(())
2624    }
2625
2626    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2627        // Create a schema with a scalar field, a list of strings, a list of structs
2628        // and a singular struct
2629        let struct_field_in_list = Field::new_struct(
2630            "item",
2631            vec![
2632                Field::new("a", DataType::UInt32, false),
2633                Field::new("b", DataType::UInt32, false),
2634            ],
2635            false,
2636        );
2637        let string_field = Field::new_list_field(DataType::Utf8, false);
2638        let strings_field = Field::new_list("item", string_field.clone(), false);
2639        let schema = Schema::new(vec![
2640            Field::new("scalar", DataType::UInt32, false),
2641            Field::new_list("strings", string_field, false),
2642            Field::new_list("structs", struct_field_in_list, false),
2643            Field::new(
2644                "struct_singular",
2645                DataType::Struct(Fields::from(vec![
2646                    Field::new("a", DataType::UInt32, false),
2647                    Field::new("b", DataType::UInt32, false),
2648                ])),
2649                false,
2650            ),
2651            Field::new_list("stringss", strings_field, false),
2652        ]);
2653
2654        table_scan(Some(table_name), &schema, None)
2655    }
2656
2657    #[test]
2658    fn test_union_after_join() -> Result<()> {
2659        let values = vec![vec![lit(1)]];
2660
2661        let left = LogicalPlanBuilder::values(values.clone())?
2662            .alias("left")?
2663            .build()?;
2664        let right = LogicalPlanBuilder::values(values)?
2665            .alias("right")?
2666            .build()?;
2667
2668        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2669
2670        let plan = LogicalPlanBuilder::from(join.clone())
2671            .union(join)?
2672            .build()?;
2673
2674        assert_snapshot!(plan, @r"
2675        Union
2676          Cross Join: 
2677            SubqueryAlias: left
2678              Values: (Int32(1))
2679            SubqueryAlias: right
2680              Values: (Int32(1))
2681          Cross Join: 
2682            SubqueryAlias: left
2683              Values: (Int32(1))
2684            SubqueryAlias: right
2685              Values: (Int32(1))
2686        ");
2687
2688        Ok(())
2689    }
2690
2691    #[test]
2692    fn plan_builder_from_logical_plan() -> Result<()> {
2693        let plan =
2694            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2695                .sort(vec![
2696                    expr::Sort::new(col("state"), true, true),
2697                    expr::Sort::new(col("salary"), false, false),
2698                ])?
2699                .build()?;
2700
2701        let plan_expected = format!("{plan}");
2702        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2703        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2704
2705        Ok(())
2706    }
2707
2708    #[test]
2709    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2710        let constraints =
2711            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2712        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2713
2714        let plan =
2715            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2716                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2717                .build()?;
2718
2719        assert_snapshot!(plan, @r"
2720        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2721          TableScan: employee_csv projection=[id, state, salary]
2722        ");
2723
2724        Ok(())
2725    }
2726
2727    #[test]
2728    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2729        let constraints =
2730            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2731        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2732
2733        let options =
2734            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2735        let plan =
2736            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2737                .with_options(options)
2738                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2739                .build()?;
2740
2741        assert_snapshot!(plan, @r"
2742        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2743          TableScan: employee_csv projection=[id, state, salary]
2744        ");
2745
2746        Ok(())
2747    }
2748
2749    #[test]
2750    fn test_join_metadata() -> Result<()> {
2751        let left_schema = DFSchema::new_with_metadata(
2752            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2753            HashMap::from([("key".to_string(), "left".to_string())]),
2754        )?;
2755        let right_schema = DFSchema::new_with_metadata(
2756            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2757            HashMap::from([("key".to_string(), "right".to_string())]),
2758        )?;
2759
2760        let join_schema =
2761            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2762        assert_eq!(
2763            join_schema.metadata(),
2764            &HashMap::from([("key".to_string(), "left".to_string())])
2765        );
2766        let join_schema =
2767            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2768        assert_eq!(
2769            join_schema.metadata(),
2770            &HashMap::from([("key".to_string(), "right".to_string())])
2771        );
2772
2773        Ok(())
2774    }
2775
2776    #[test]
2777    fn test_unique_field_aliases() {
2778        let t1_field_1 = Field::new("a", DataType::Int32, false);
2779        let t2_field_1 = Field::new("a", DataType::Int32, false);
2780        let t2_field_3 = Field::new("a", DataType::Int32, false);
2781        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2782        let t1_field_2 = Field::new("b", DataType::Int32, false);
2783        let t2_field_2 = Field::new("b", DataType::Int32, false);
2784
2785        let fields = vec![
2786            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2787        ];
2788        let fields = Fields::from(fields);
2789
2790        let remove_redundant = unique_field_aliases(&fields);
2791
2792        // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2793        // First occurrence of each field name keeps original name (None), duplicates get
2794        // incremental suffixes (:1, :2, etc.).
2795        // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2796        // conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2797        // as well to `a:1:1`.
2798        assert_eq!(
2799            remove_redundant,
2800            vec![
2801                None,
2802                Some("a:1".to_string()),
2803                None,
2804                Some("b:1".to_string()),
2805                Some("a:2".to_string()),
2806                Some("a:1:1".to_string()),
2807            ]
2808        );
2809    }
2810}