datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29    coerce_plan_expr_for_schema, normalize_col,
30    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31    rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37    Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43    group_window_expr_by_sort_keys,
44};
45use crate::{
46    and, binary_expr, lit, DmlStatement, ExplainOption, Expr, ExprSchemable, Operator,
47    RecursiveQuery, Statement, TableProviderFilterPushDown, TableSource, WriteOp,
48};
49
50use super::dml::InsertOp;
51use arrow::compute::can_cast_types;
52use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
53use datafusion_common::display::ToStringifiedPlan;
54use datafusion_common::file_options::file_type::FileType;
55use datafusion_common::{
56    exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
57    plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality,
58    Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
59};
60use datafusion_expr_common::type_coercion::binary::type_union_resolution;
61
62use indexmap::IndexSet;
63
64/// Default table name for unnamed table
65pub const UNNAMED_TABLE: &str = "?table?";
66
67/// Options for [`LogicalPlanBuilder`]
68#[derive(Default, Debug, Clone)]
69pub struct LogicalPlanBuilderOptions {
70    /// Flag indicating whether the plan builder should add
71    /// functionally dependent expressions as additional aggregation groupings.
72    add_implicit_group_by_exprs: bool,
73}
74
75impl LogicalPlanBuilderOptions {
76    pub fn new() -> Self {
77        Default::default()
78    }
79
80    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
81    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
82        self.add_implicit_group_by_exprs = add;
83        self
84    }
85}
86
87/// Builder for logical plans
88///
89/// # Example building a simple plan
90/// ```
91/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
92/// # use datafusion_common::Result;
93/// # use arrow::datatypes::{Schema, DataType, Field};
94/// #
95/// # fn main() -> Result<()> {
96/// #
97/// # fn employee_schema() -> Schema {
98/// #    Schema::new(vec![
99/// #           Field::new("id", DataType::Int32, false),
100/// #           Field::new("first_name", DataType::Utf8, false),
101/// #           Field::new("last_name", DataType::Utf8, false),
102/// #           Field::new("state", DataType::Utf8, false),
103/// #           Field::new("salary", DataType::Int32, false),
104/// #       ])
105/// #   }
106/// #
107/// // Create a plan similar to
108/// // SELECT last_name
109/// // FROM employees
110/// // WHERE salary < 1000
111/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
112///  // Keep only rows where salary < 1000
113///  .filter(col("salary").lt(lit(1000)))?
114///  // only show "last_name" in the final results
115///  .project(vec![col("last_name")])?
116///  .build()?;
117///
118/// // Convert from plan back to builder
119/// let builder = LogicalPlanBuilder::from(plan);
120///
121/// # Ok(())
122/// # }
123/// ```
124#[derive(Debug, Clone)]
125pub struct LogicalPlanBuilder {
126    plan: Arc<LogicalPlan>,
127    options: LogicalPlanBuilderOptions,
128}
129
130impl LogicalPlanBuilder {
131    /// Create a builder from an existing plan
132    pub fn new(plan: LogicalPlan) -> Self {
133        Self {
134            plan: Arc::new(plan),
135            options: LogicalPlanBuilderOptions::default(),
136        }
137    }
138
139    /// Create a builder from an existing plan
140    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
141        Self {
142            plan,
143            options: LogicalPlanBuilderOptions::default(),
144        }
145    }
146
147    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
148        self.options = options;
149        self
150    }
151
152    /// Return the output schema of the plan build so far
153    pub fn schema(&self) -> &DFSchemaRef {
154        self.plan.schema()
155    }
156
157    /// Return the LogicalPlan of the plan build so far
158    pub fn plan(&self) -> &LogicalPlan {
159        &self.plan
160    }
161
162    /// Create an empty relation.
163    ///
164    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
165    pub fn empty(produce_one_row: bool) -> Self {
166        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
167            produce_one_row,
168            schema: DFSchemaRef::new(DFSchema::empty()),
169        }))
170    }
171
172    /// Convert a regular plan into a recursive query.
173    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
174    pub fn to_recursive_query(
175        self,
176        name: String,
177        recursive_term: LogicalPlan,
178        is_distinct: bool,
179    ) -> Result<Self> {
180        // TODO: we need to do a bunch of validation here. Maybe more.
181        if is_distinct {
182            return not_impl_err!(
183                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
184            );
185        }
186        // Ensure that the static term and the recursive term have the same number of fields
187        let static_fields_len = self.plan.schema().fields().len();
188        let recursive_fields_len = recursive_term.schema().fields().len();
189        if static_fields_len != recursive_fields_len {
190            return plan_err!(
191                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
192                static_fields_len, recursive_fields_len
193            );
194        }
195        // Ensure that the recursive term has the same field types as the static term
196        let coerced_recursive_term =
197            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
198        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
199            name,
200            static_term: self.plan,
201            recursive_term: Arc::new(coerced_recursive_term),
202            is_distinct,
203        })))
204    }
205
206    /// Create a values list based relation, and the schema is inferred from data, consuming
207    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
208    /// documentation for more details.
209    ///
210    /// so it's usually better to override the default names with a table alias list.
211    ///
212    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
213    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
214        if values.is_empty() {
215            return plan_err!("Values list cannot be empty");
216        }
217        let n_cols = values[0].len();
218        if n_cols == 0 {
219            return plan_err!("Values list cannot be zero length");
220        }
221        for (i, row) in values.iter().enumerate() {
222            if row.len() != n_cols {
223                return plan_err!(
224                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
225                    row.len(),
226                    i,
227                    n_cols
228                );
229            }
230        }
231
232        // Infer from data itself
233        Self::infer_data(values)
234    }
235
236    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
237    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
238    /// documentation for more details.
239    ///
240    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
241    /// The column names are not specified by the SQL standard and different database systems do it differently,
242    /// so it's usually better to override the default names with a table alias list.
243    ///
244    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
245    pub fn values_with_schema(
246        values: Vec<Vec<Expr>>,
247        schema: &DFSchemaRef,
248    ) -> Result<Self> {
249        if values.is_empty() {
250            return plan_err!("Values list cannot be empty");
251        }
252        let n_cols = schema.fields().len();
253        if n_cols == 0 {
254            return plan_err!("Values list cannot be zero length");
255        }
256        for (i, row) in values.iter().enumerate() {
257            if row.len() != n_cols {
258                return plan_err!(
259                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
260                    row.len(),
261                    i,
262                    n_cols
263                );
264            }
265        }
266
267        // Check the type of value against the schema
268        Self::infer_values_from_schema(values, schema)
269    }
270
271    fn infer_values_from_schema(
272        values: Vec<Vec<Expr>>,
273        schema: &DFSchema,
274    ) -> Result<Self> {
275        let n_cols = values[0].len();
276        let mut fields = ValuesFields::new();
277        for j in 0..n_cols {
278            let field_type = schema.field(j).data_type();
279            let field_nullable = schema.field(j).is_nullable();
280            for row in values.iter() {
281                let value = &row[j];
282                let data_type = value.get_type(schema)?;
283
284                if !data_type.equals_datatype(field_type) {
285                    if can_cast_types(&data_type, field_type) {
286                    } else {
287                        return exec_err!(
288                            "type mismatch and can't cast to got {} and {}",
289                            data_type,
290                            field_type
291                        );
292                    }
293                }
294            }
295            fields.push(field_type.to_owned(), field_nullable);
296        }
297
298        Self::infer_inner(values, fields, schema)
299    }
300
301    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
302        let n_cols = values[0].len();
303        let schema = DFSchema::empty();
304        let mut fields = ValuesFields::new();
305
306        for j in 0..n_cols {
307            let mut common_type: Option<DataType> = None;
308            for (i, row) in values.iter().enumerate() {
309                let value = &row[j];
310                let data_type = value.get_type(&schema)?;
311                if data_type == DataType::Null {
312                    continue;
313                }
314
315                if let Some(prev_type) = common_type {
316                    // get common type of each column values.
317                    let data_types = vec![prev_type.clone(), data_type.clone()];
318                    let Some(new_type) = type_union_resolution(&data_types) else {
319                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
320                    };
321                    common_type = Some(new_type);
322                } else {
323                    common_type = Some(data_type);
324                }
325            }
326            // assuming common_type was not set, and no error, therefore the type should be NULL
327            // since the code loop skips NULL
328            fields.push(common_type.unwrap_or(DataType::Null), true);
329        }
330
331        Self::infer_inner(values, fields, &schema)
332    }
333
334    fn infer_inner(
335        mut values: Vec<Vec<Expr>>,
336        fields: ValuesFields,
337        schema: &DFSchema,
338    ) -> Result<Self> {
339        let fields = fields.into_fields();
340        // wrap cast if data type is not same as common type.
341        for row in &mut values {
342            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
343                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
344                    row[j] = Expr::Literal(
345                        ScalarValue::try_from(field_type)?,
346                        metadata.clone(),
347                    );
348                } else {
349                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
350                }
351            }
352        }
353
354        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
355        let schema = DFSchemaRef::new(dfschema);
356
357        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
358    }
359
360    /// Convert a table provider into a builder with a TableScan
361    ///
362    /// Note that if you pass a string as `table_name`, it is treated
363    /// as a SQL identifier, as described on [`TableReference`] and
364    /// thus is normalized
365    ///
366    /// # Example:
367    /// ```
368    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
369    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
370    /// # };
371    /// # use std::sync::Arc;
372    /// # use arrow::datatypes::{Schema, DataType, Field};
373    /// # use datafusion_common::TableReference;
374    /// #
375    /// # let employee_schema = Arc::new(Schema::new(vec![
376    /// #           Field::new("id", DataType::Int32, false),
377    /// # ])) as _;
378    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
379    /// // Scan table_source with the name "mytable" (after normalization)
380    /// # let table = table_source.clone();
381    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
382    ///
383    /// // Scan table_source with the name "MyTable" by enclosing in quotes
384    /// # let table = table_source.clone();
385    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
386    ///
387    /// // Scan table_source with the name "MyTable" by forming the table reference
388    /// # let table = table_source.clone();
389    /// let table_reference = TableReference::bare("MyTable");
390    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
391    /// ```
392    pub fn scan(
393        table_name: impl Into<TableReference>,
394        table_source: Arc<dyn TableSource>,
395        projection: Option<Vec<usize>>,
396    ) -> Result<Self> {
397        Self::scan_with_filters(table_name, table_source, projection, vec![])
398    }
399
400    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
401    pub fn copy_to(
402        input: LogicalPlan,
403        output_url: String,
404        file_type: Arc<dyn FileType>,
405        options: HashMap<String, String>,
406        partition_by: Vec<String>,
407    ) -> Result<Self> {
408        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
409            Arc::new(input),
410            output_url,
411            partition_by,
412            file_type,
413            options,
414        ))))
415    }
416
417    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
418    ///
419    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
420    ///
421    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
422    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
423    ///
424    /// # Example:
425    /// ```
426    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
427    /// #  logical_plan::builder::LogicalTableSource,
428    /// # };
429    /// # use std::sync::Arc;
430    /// # use arrow::datatypes::{Schema, DataType, Field};
431    /// # use datafusion_expr::dml::InsertOp;
432    /// #
433    /// # fn test() -> datafusion_common::Result<()> {
434    /// # let employee_schema = Arc::new(Schema::new(vec![
435    /// #     Field::new("id", DataType::Int32, false),
436    /// # ])) as _;
437    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
438    /// // VALUES (1), (2)
439    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?
440    ///   .build()?;
441    /// // INSERT INTO MyTable VALUES (1), (2)
442    /// let insert_plan = LogicalPlanBuilder::insert_into(
443    ///   input,
444    ///   "MyTable",
445    ///   table_source,
446    ///   InsertOp::Append,
447    /// )?;
448    /// # Ok(())
449    /// # }
450    /// ```
451    pub fn insert_into(
452        input: LogicalPlan,
453        table_name: impl Into<TableReference>,
454        target: Arc<dyn TableSource>,
455        insert_op: InsertOp,
456    ) -> Result<Self> {
457        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
458            table_name.into(),
459            target,
460            WriteOp::Insert(insert_op),
461            Arc::new(input),
462        ))))
463    }
464
465    /// Convert a table provider into a builder with a TableScan
466    pub fn scan_with_filters(
467        table_name: impl Into<TableReference>,
468        table_source: Arc<dyn TableSource>,
469        projection: Option<Vec<usize>>,
470        filters: Vec<Expr>,
471    ) -> Result<Self> {
472        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
473    }
474
475    /// Convert a table provider into a builder with a TableScan with filter and fetch
476    pub fn scan_with_filters_fetch(
477        table_name: impl Into<TableReference>,
478        table_source: Arc<dyn TableSource>,
479        projection: Option<Vec<usize>>,
480        filters: Vec<Expr>,
481        fetch: Option<usize>,
482    ) -> Result<Self> {
483        Self::scan_with_filters_inner(
484            table_name,
485            table_source,
486            projection,
487            filters,
488            fetch,
489        )
490    }
491
492    fn scan_with_filters_inner(
493        table_name: impl Into<TableReference>,
494        table_source: Arc<dyn TableSource>,
495        projection: Option<Vec<usize>>,
496        filters: Vec<Expr>,
497        fetch: Option<usize>,
498    ) -> Result<Self> {
499        let table_scan =
500            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
501
502        // Inline TableScan
503        if table_scan.filters.is_empty() {
504            if let Some(p) = table_scan.source.get_logical_plan() {
505                let sub_plan = p.into_owned();
506
507                if let Some(proj) = table_scan.projection {
508                    let projection_exprs = proj
509                        .into_iter()
510                        .map(|i| {
511                            Expr::Column(Column::from(
512                                sub_plan.schema().qualified_field(i),
513                            ))
514                        })
515                        .collect::<Vec<_>>();
516                    return Self::new(sub_plan)
517                        .project(projection_exprs)?
518                        .alias(table_scan.table_name);
519                }
520
521                // Ensures that the reference to the inlined table remains the
522                // same, meaning we don't have to change any of the parent nodes
523                // that reference this table.
524                return Self::new(sub_plan).alias(table_scan.table_name);
525            }
526        }
527
528        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
529    }
530
531    /// Wrap a plan in a window
532    pub fn window_plan(
533        input: LogicalPlan,
534        window_exprs: impl IntoIterator<Item = Expr>,
535    ) -> Result<LogicalPlan> {
536        let mut plan = input;
537        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
538        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
539        // we compare the sort key themselves and if one window's sort keys are a prefix of another
540        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
541        // The sort_by() implementation here is a stable sort.
542        // Note that by this rule if there's an empty over, it'll be at the top level
543        groups.sort_by(|(key_a, _), (key_b, _)| {
544            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
545                let key_ordering = compare_sort_expr(first, second, plan.schema());
546                match key_ordering {
547                    Ordering::Less => {
548                        return Ordering::Less;
549                    }
550                    Ordering::Greater => {
551                        return Ordering::Greater;
552                    }
553                    Ordering::Equal => {}
554                }
555            }
556            key_b.len().cmp(&key_a.len())
557        });
558        for (_, exprs) in groups {
559            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
560            // Partition and sorting is done at physical level, see the EnforceDistribution
561            // and EnforceSorting rules.
562            plan = LogicalPlanBuilder::from(plan)
563                .window(window_exprs)?
564                .build()?;
565        }
566        Ok(plan)
567    }
568
569    /// Apply a projection without alias.
570    pub fn project(
571        self,
572        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
573    ) -> Result<Self> {
574        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
575    }
576
577    /// Apply a projection without alias with optional validation
578    /// (true to validate, false to not validate)
579    pub fn project_with_validation(
580        self,
581        expr: Vec<(impl Into<SelectExpr>, bool)>,
582    ) -> Result<Self> {
583        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
584    }
585
586    /// Select the given column indices
587    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
588        let exprs: Vec<_> = indices
589            .into_iter()
590            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
591            .collect();
592        self.project(exprs)
593    }
594
595    /// Apply a filter
596    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
597        let expr = normalize_col(expr.into(), &self.plan)?;
598        Filter::try_new(expr, self.plan)
599            .map(LogicalPlan::Filter)
600            .map(Self::new)
601    }
602
603    /// Apply a filter which is used for a having clause
604    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
605        let expr = normalize_col(expr.into(), &self.plan)?;
606        Filter::try_new(expr, self.plan)
607            .map(LogicalPlan::Filter)
608            .map(Self::from)
609    }
610
611    /// Make a builder for a prepare logical plan from the builder's plan
612    pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
613        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
614            Prepare {
615                name,
616                data_types,
617                input: self.plan,
618            },
619        ))))
620    }
621
622    /// Limit the number of rows returned
623    ///
624    /// `skip` - Number of rows to skip before fetch any row.
625    ///
626    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
627    ///          if specified.
628    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
629        let skip_expr = if skip == 0 {
630            None
631        } else {
632            Some(lit(skip as i64))
633        };
634        let fetch_expr = fetch.map(|f| lit(f as i64));
635        self.limit_by_expr(skip_expr, fetch_expr)
636    }
637
638    /// Limit the number of rows returned
639    ///
640    /// Similar to `limit` but uses expressions for `skip` and `fetch`
641    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
642        Ok(Self::new(LogicalPlan::Limit(Limit {
643            skip: skip.map(Box::new),
644            fetch: fetch.map(Box::new),
645            input: self.plan,
646        })))
647    }
648
649    /// Apply an alias
650    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
651        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
652    }
653
654    /// Add missing sort columns to all downstream projection
655    ///
656    /// Thus, if you have a LogicalPlan that selects A and B and have
657    /// not requested a sort by C, this code will add C recursively to
658    /// all input projections.
659    ///
660    /// Adding a new column is not correct if there is a `Distinct`
661    /// node, which produces only distinct values of its
662    /// inputs. Adding a new column to its input will result in
663    /// potentially different results than with the original column.
664    ///
665    /// For example, if the input is like:
666    ///
667    /// Distinct(A, B)
668    ///
669    /// If the input looks like
670    ///
671    /// a | b | c
672    /// --+---+---
673    /// 1 | 2 | 3
674    /// 1 | 2 | 4
675    ///
676    /// Distinct (A, B) --> (1,2)
677    ///
678    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
679    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
680    ///
681    /// See <https://github.com/apache/datafusion/issues/5065> for more details
682    fn add_missing_columns(
683        curr_plan: LogicalPlan,
684        missing_cols: &IndexSet<Column>,
685        is_distinct: bool,
686    ) -> Result<LogicalPlan> {
687        match curr_plan {
688            LogicalPlan::Projection(Projection {
689                input,
690                mut expr,
691                schema: _,
692            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
693                let mut missing_exprs = missing_cols
694                    .iter()
695                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
696                    .collect::<Result<Vec<_>>>()?;
697
698                // Do not let duplicate columns to be added, some of the
699                // missing_cols may be already present but without the new
700                // projected alias.
701                missing_exprs.retain(|e| !expr.contains(e));
702                if is_distinct {
703                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
704                }
705                expr.extend(missing_exprs);
706                project(Arc::unwrap_or_clone(input), expr)
707            }
708            _ => {
709                let is_distinct =
710                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
711                let new_inputs = curr_plan
712                    .inputs()
713                    .into_iter()
714                    .map(|input_plan| {
715                        Self::add_missing_columns(
716                            (*input_plan).clone(),
717                            missing_cols,
718                            is_distinct,
719                        )
720                    })
721                    .collect::<Result<Vec<_>>>()?;
722                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
723            }
724        }
725    }
726
727    fn ambiguous_distinct_check(
728        missing_exprs: &[Expr],
729        missing_cols: &IndexSet<Column>,
730        projection_exprs: &[Expr],
731    ) -> Result<()> {
732        if missing_exprs.is_empty() {
733            return Ok(());
734        }
735
736        // if the missing columns are all only aliases for things in
737        // the existing select list, it is ok
738        //
739        // This handles the special case for
740        // SELECT col as <alias> ORDER BY <alias>
741        //
742        // As described in https://github.com/apache/datafusion/issues/5293
743        let all_aliases = missing_exprs.iter().all(|e| {
744            projection_exprs.iter().any(|proj_expr| {
745                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
746                    e == expr.as_ref()
747                } else {
748                    false
749                }
750            })
751        });
752        if all_aliases {
753            return Ok(());
754        }
755
756        let missing_col_names = missing_cols
757            .iter()
758            .map(|col| col.flat_name())
759            .collect::<String>();
760
761        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
762    }
763
764    /// Apply a sort by provided expressions with default direction
765    pub fn sort_by(
766        self,
767        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
768    ) -> Result<Self> {
769        self.sort(
770            expr.into_iter()
771                .map(|e| e.into().sort(true, false))
772                .collect::<Vec<SortExpr>>(),
773        )
774    }
775
776    pub fn sort(
777        self,
778        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
779    ) -> Result<Self> {
780        self.sort_with_limit(sorts, None)
781    }
782
783    /// Apply a sort
784    pub fn sort_with_limit(
785        self,
786        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
787        fetch: Option<usize>,
788    ) -> Result<Self> {
789        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
790
791        let schema = self.plan.schema();
792
793        // Collect sort columns that are missing in the input plan's schema
794        let mut missing_cols: IndexSet<Column> = IndexSet::new();
795        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
796            let columns = sort.expr.column_refs();
797
798            missing_cols.extend(
799                columns
800                    .into_iter()
801                    .filter(|c| !schema.has_column(c))
802                    .cloned(),
803            );
804
805            Ok(())
806        })?;
807
808        if missing_cols.is_empty() {
809            return Ok(Self::new(LogicalPlan::Sort(Sort {
810                expr: normalize_sorts(sorts, &self.plan)?,
811                input: self.plan,
812                fetch,
813            })));
814        }
815
816        // remove pushed down sort columns
817        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
818
819        let is_distinct = false;
820        let plan = Self::add_missing_columns(
821            Arc::unwrap_or_clone(self.plan),
822            &missing_cols,
823            is_distinct,
824        )?;
825
826        let sort_plan = LogicalPlan::Sort(Sort {
827            expr: normalize_sorts(sorts, &plan)?,
828            input: Arc::new(plan),
829            fetch,
830        });
831
832        Projection::try_new(new_expr, Arc::new(sort_plan))
833            .map(LogicalPlan::Projection)
834            .map(Self::new)
835    }
836
837    /// Apply a union, preserving duplicate rows
838    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
839        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
840    }
841
842    /// Apply a union by name, preserving duplicate rows
843    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
844        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
845    }
846
847    /// Apply a union by name, removing duplicate rows
848    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
849        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
850        let right_plan: LogicalPlan = plan;
851
852        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
853            union_by_name(left_plan, right_plan)?,
854        )))))
855    }
856
857    /// Apply a union, removing duplicate rows
858    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
859        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
860        let right_plan: LogicalPlan = plan;
861
862        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
863            union(left_plan, right_plan)?,
864        )))))
865    }
866
867    /// Apply deduplication: Only distinct (different) values are returned)
868    pub fn distinct(self) -> Result<Self> {
869        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
870    }
871
872    /// Project first values of the specified expression list according to the provided
873    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
874    pub fn distinct_on(
875        self,
876        on_expr: Vec<Expr>,
877        select_expr: Vec<Expr>,
878        sort_expr: Option<Vec<SortExpr>>,
879    ) -> Result<Self> {
880        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
881            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
882        ))))
883    }
884
885    /// Apply a join to `right` using explicitly specified columns and an
886    /// optional filter expression.
887    ///
888    /// See [`join_on`](Self::join_on) for a more concise way to specify the
889    /// join condition. Since DataFusion will automatically identify and
890    /// optimize equality predicates there is no performance difference between
891    /// this function and `join_on`
892    ///
893    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
894    /// example below), which are then combined with the optional `filter`
895    /// expression.
896    ///
897    /// Note that in case of outer join, the `filter` is applied to only matched rows.
898    pub fn join(
899        self,
900        right: LogicalPlan,
901        join_type: JoinType,
902        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
903        filter: Option<Expr>,
904    ) -> Result<Self> {
905        self.join_detailed(
906            right,
907            join_type,
908            join_keys,
909            filter,
910            NullEquality::NullEqualsNothing,
911        )
912    }
913
914    /// Apply a join using the specified expressions.
915    ///
916    /// Note that DataFusion automatically optimizes joins, including
917    /// identifying and optimizing equality predicates.
918    ///
919    /// # Example
920    ///
921    /// ```
922    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
923    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
924    /// # use std::sync::Arc;
925    /// # use arrow::datatypes::{Schema, DataType, Field};
926    /// # use datafusion_common::Result;
927    /// # fn main() -> Result<()> {
928    /// let example_schema = Arc::new(Schema::new(vec![
929    ///     Field::new("a", DataType::Int32, false),
930    ///     Field::new("b", DataType::Int32, false),
931    ///     Field::new("c", DataType::Int32, false),
932    /// ]));
933    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
934    /// let left_table = table_source.clone();
935    /// let right_table = table_source.clone();
936    ///
937    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
938    ///
939    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
940    /// let exprs = vec![
941    ///     col("left.a").eq(col("right.a")),
942    ///     col("left.b").not_eq(col("right.b"))
943    ///  ];
944    ///
945    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
946    /// // finding all pairs of rows from `left` and `right` where
947    /// // where `a = a2` and `b != b2`.
948    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
949    ///     .join_on(right_plan, JoinType::Inner, exprs)?
950    ///     .build()?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub fn join_on(
955        self,
956        right: LogicalPlan,
957        join_type: JoinType,
958        on_exprs: impl IntoIterator<Item = Expr>,
959    ) -> Result<Self> {
960        let filter = on_exprs.into_iter().reduce(Expr::and);
961
962        self.join_detailed(
963            right,
964            join_type,
965            (Vec::<Column>::new(), Vec::<Column>::new()),
966            filter,
967            NullEquality::NullEqualsNothing,
968        )
969    }
970
971    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
972        if column.relation.is_some() {
973            // column is already normalized
974            return Ok(column);
975        }
976
977        let schema = plan.schema();
978        let fallback_schemas = plan.fallback_normalize_schemas();
979        let using_columns = plan.using_columns()?;
980        column.normalize_with_schemas_and_ambiguity_check(
981            &[&[schema], &fallback_schemas],
982            &using_columns,
983        )
984    }
985
986    /// Apply a join with on constraint and specified null equality.
987    ///
988    /// The behavior is the same as [`join`](Self::join) except that it allows
989    /// specifying the null equality behavior.
990    ///
991    /// The `null_equality` dictates how `null` values are joined.
992    pub fn join_detailed(
993        self,
994        right: LogicalPlan,
995        join_type: JoinType,
996        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
997        filter: Option<Expr>,
998        null_equality: NullEquality,
999    ) -> Result<Self> {
1000        if join_keys.0.len() != join_keys.1.len() {
1001            return plan_err!("left_keys and right_keys were not the same length");
1002        }
1003
1004        let filter = if let Some(expr) = filter {
1005            let filter = normalize_col_with_schemas_and_ambiguity_check(
1006                expr,
1007                &[&[self.schema(), right.schema()]],
1008                &[],
1009            )?;
1010            Some(filter)
1011        } else {
1012            None
1013        };
1014
1015        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1016            join_keys
1017                .0
1018                .into_iter()
1019                .zip(join_keys.1)
1020                .map(|(l, r)| {
1021                    let l = l.into();
1022                    let r = r.into();
1023
1024                    match (&l.relation, &r.relation) {
1025                        (Some(lr), Some(rr)) => {
1026                            let l_is_left =
1027                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1028                            let l_is_right =
1029                                right.schema().field_with_qualified_name(lr, &l.name);
1030                            let r_is_left =
1031                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1032                            let r_is_right =
1033                                right.schema().field_with_qualified_name(rr, &r.name);
1034
1035                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1036                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1037                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1038                                _ => (
1039                                    Self::normalize(&self.plan, l),
1040                                    Self::normalize(&right, r),
1041                                ),
1042                            }
1043                        }
1044                        (Some(lr), None) => {
1045                            let l_is_left =
1046                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1047                            let l_is_right =
1048                                right.schema().field_with_qualified_name(lr, &l.name);
1049
1050                            match (l_is_left, l_is_right) {
1051                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1052                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1053                                _ => (
1054                                    Self::normalize(&self.plan, l),
1055                                    Self::normalize(&right, r),
1056                                ),
1057                            }
1058                        }
1059                        (None, Some(rr)) => {
1060                            let r_is_left =
1061                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1062                            let r_is_right =
1063                                right.schema().field_with_qualified_name(rr, &r.name);
1064
1065                            match (r_is_left, r_is_right) {
1066                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1067                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1068                                _ => (
1069                                    Self::normalize(&self.plan, l),
1070                                    Self::normalize(&right, r),
1071                                ),
1072                            }
1073                        }
1074                        (None, None) => {
1075                            let mut swap = false;
1076                            let left_key = Self::normalize(&self.plan, l.clone())
1077                                .or_else(|_| {
1078                                    swap = true;
1079                                    Self::normalize(&right, l)
1080                                });
1081                            if swap {
1082                                (Self::normalize(&self.plan, r), left_key)
1083                            } else {
1084                                (left_key, Self::normalize(&right, r))
1085                            }
1086                        }
1087                    }
1088                })
1089                .unzip();
1090
1091        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1092        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1093
1094        let on: Vec<_> = left_keys
1095            .into_iter()
1096            .zip(right_keys)
1097            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1098            .collect();
1099        let join_schema =
1100            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1101
1102        // Inner type without join condition is cross join
1103        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1104            return plan_err!("join condition should not be empty");
1105        }
1106
1107        Ok(Self::new(LogicalPlan::Join(Join {
1108            left: self.plan,
1109            right: Arc::new(right),
1110            on,
1111            filter,
1112            join_type,
1113            join_constraint: JoinConstraint::On,
1114            schema: DFSchemaRef::new(join_schema),
1115            null_equality,
1116        })))
1117    }
1118
1119    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1120    pub fn join_using(
1121        self,
1122        right: LogicalPlan,
1123        join_type: JoinType,
1124        using_keys: Vec<Column>,
1125    ) -> Result<Self> {
1126        let left_keys: Vec<Column> = using_keys
1127            .clone()
1128            .into_iter()
1129            .map(|c| Self::normalize(&self.plan, c))
1130            .collect::<Result<_>>()?;
1131        let right_keys: Vec<Column> = using_keys
1132            .into_iter()
1133            .map(|c| Self::normalize(&right, c))
1134            .collect::<Result<_>>()?;
1135
1136        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1137        let mut join_on: Vec<(Expr, Expr)> = vec![];
1138        let mut filters: Option<Expr> = None;
1139        for (l, r) in &on {
1140            if self.plan.schema().has_column(l)
1141                && right.schema().has_column(r)
1142                && can_hash(
1143                    datafusion_common::ExprSchema::field_from_column(
1144                        self.plan.schema(),
1145                        l,
1146                    )?
1147                    .data_type(),
1148                )
1149            {
1150                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1151            } else if self.plan.schema().has_column(l)
1152                && right.schema().has_column(r)
1153                && can_hash(
1154                    datafusion_common::ExprSchema::field_from_column(
1155                        self.plan.schema(),
1156                        r,
1157                    )?
1158                    .data_type(),
1159                )
1160            {
1161                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1162            } else {
1163                let expr = binary_expr(
1164                    Expr::Column(l.clone()),
1165                    Operator::Eq,
1166                    Expr::Column(r.clone()),
1167                );
1168                match filters {
1169                    None => filters = Some(expr),
1170                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1171                }
1172            }
1173        }
1174
1175        if join_on.is_empty() {
1176            let join = Self::from(self.plan).cross_join(right)?;
1177            join.filter(filters.ok_or_else(|| {
1178                DataFusionError::Internal("filters should not be None here".to_string())
1179            })?)
1180        } else {
1181            let join = Join::try_new(
1182                self.plan,
1183                Arc::new(right),
1184                join_on,
1185                filters,
1186                join_type,
1187                JoinConstraint::Using,
1188                NullEquality::NullEqualsNothing,
1189            )?;
1190
1191            Ok(Self::new(LogicalPlan::Join(join)))
1192        }
1193    }
1194
1195    /// Apply a cross join
1196    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1197        let join = Join::try_new(
1198            self.plan,
1199            Arc::new(right),
1200            vec![],
1201            None,
1202            JoinType::Inner,
1203            JoinConstraint::On,
1204            NullEquality::NullEqualsNothing,
1205        )?;
1206
1207        Ok(Self::new(LogicalPlan::Join(join)))
1208    }
1209
1210    /// Repartition
1211    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1212        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1213            input: self.plan,
1214            partitioning_scheme,
1215        })))
1216    }
1217
1218    /// Apply a window functions to extend the schema
1219    pub fn window(
1220        self,
1221        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1222    ) -> Result<Self> {
1223        let window_expr = normalize_cols(window_expr, &self.plan)?;
1224        validate_unique_names("Windows", &window_expr)?;
1225        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1226            window_expr,
1227            self.plan,
1228        )?)))
1229    }
1230
1231    /// Apply an aggregate: grouping on the `group_expr` expressions
1232    /// and calculating `aggr_expr` aggregates for each distinct
1233    /// value of the `group_expr`;
1234    pub fn aggregate(
1235        self,
1236        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1237        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1238    ) -> Result<Self> {
1239        let group_expr = normalize_cols(group_expr, &self.plan)?;
1240        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1241
1242        let group_expr = if self.options.add_implicit_group_by_exprs {
1243            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1244        } else {
1245            group_expr
1246        };
1247
1248        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1249            .map(LogicalPlan::Aggregate)
1250            .map(Self::new)
1251    }
1252
1253    /// Create an expression to represent the explanation of the plan
1254    ///
1255    /// if `analyze` is true, runs the actual plan and produces
1256    /// information about metrics during run.
1257    ///
1258    /// if `verbose` is true, prints out additional details.
1259    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1260        // Keep the format default to Indent
1261        self.explain_option_format(
1262            ExplainOption::default()
1263                .with_verbose(verbose)
1264                .with_analyze(analyze),
1265        )
1266    }
1267
1268    /// Create an expression to represent the explanation of the plan
1269    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1270    /// Details see [`ExplainOption`].
1271    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1272        let schema = LogicalPlan::explain_schema();
1273        let schema = schema.to_dfschema_ref()?;
1274
1275        if explain_option.analyze {
1276            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1277                verbose: explain_option.verbose,
1278                input: self.plan,
1279                schema,
1280            })))
1281        } else {
1282            let stringified_plans =
1283                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1284
1285            Ok(Self::new(LogicalPlan::Explain(Explain {
1286                verbose: explain_option.verbose,
1287                plan: self.plan,
1288                explain_format: explain_option.format,
1289                stringified_plans,
1290                schema,
1291                logical_optimization_succeeded: false,
1292            })))
1293        }
1294    }
1295
1296    /// Process intersect set operator
1297    pub fn intersect(
1298        left_plan: LogicalPlan,
1299        right_plan: LogicalPlan,
1300        is_all: bool,
1301    ) -> Result<LogicalPlan> {
1302        LogicalPlanBuilder::intersect_or_except(
1303            left_plan,
1304            right_plan,
1305            JoinType::LeftSemi,
1306            is_all,
1307        )
1308    }
1309
1310    /// Process except set operator
1311    pub fn except(
1312        left_plan: LogicalPlan,
1313        right_plan: LogicalPlan,
1314        is_all: bool,
1315    ) -> Result<LogicalPlan> {
1316        LogicalPlanBuilder::intersect_or_except(
1317            left_plan,
1318            right_plan,
1319            JoinType::LeftAnti,
1320            is_all,
1321        )
1322    }
1323
1324    /// Process intersect or except
1325    fn intersect_or_except(
1326        left_plan: LogicalPlan,
1327        right_plan: LogicalPlan,
1328        join_type: JoinType,
1329        is_all: bool,
1330    ) -> Result<LogicalPlan> {
1331        let left_len = left_plan.schema().fields().len();
1332        let right_len = right_plan.schema().fields().len();
1333
1334        if left_len != right_len {
1335            return plan_err!(
1336                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1337            );
1338        }
1339
1340        let join_keys = left_plan
1341            .schema()
1342            .fields()
1343            .iter()
1344            .zip(right_plan.schema().fields().iter())
1345            .map(|(left_field, right_field)| {
1346                (
1347                    (Column::from_name(left_field.name())),
1348                    (Column::from_name(right_field.name())),
1349                )
1350            })
1351            .unzip();
1352        if is_all {
1353            LogicalPlanBuilder::from(left_plan)
1354                .join_detailed(
1355                    right_plan,
1356                    join_type,
1357                    join_keys,
1358                    None,
1359                    NullEquality::NullEqualsNull,
1360                )?
1361                .build()
1362        } else {
1363            LogicalPlanBuilder::from(left_plan)
1364                .distinct()?
1365                .join_detailed(
1366                    right_plan,
1367                    join_type,
1368                    join_keys,
1369                    None,
1370                    NullEquality::NullEqualsNull,
1371                )?
1372                .build()
1373        }
1374    }
1375
1376    /// Build the plan
1377    pub fn build(self) -> Result<LogicalPlan> {
1378        Ok(Arc::unwrap_or_clone(self.plan))
1379    }
1380
1381    /// Apply a join with both explicit equijoin and non equijoin predicates.
1382    ///
1383    /// Note this is a low level API that requires identifying specific
1384    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1385    /// automatically identifies predicates appropriately.
1386    ///
1387    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1388    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1389    /// to columns from the existing input. `r`, the second element of the tuple,
1390    /// must only refer to columns from the right input.
1391    ///
1392    /// `filter` contains any other filter expression to apply during the
1393    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1394    /// than the filter expressions, so they are preferred.
1395    pub fn join_with_expr_keys(
1396        self,
1397        right: LogicalPlan,
1398        join_type: JoinType,
1399        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1400        filter: Option<Expr>,
1401    ) -> Result<Self> {
1402        if equi_exprs.0.len() != equi_exprs.1.len() {
1403            return plan_err!("left_keys and right_keys were not the same length");
1404        }
1405
1406        let join_key_pairs = equi_exprs
1407            .0
1408            .into_iter()
1409            .zip(equi_exprs.1)
1410            .map(|(l, r)| {
1411                let left_key = l.into();
1412                let right_key = r.into();
1413                let mut left_using_columns  = HashSet::new();
1414                expr_to_columns(&left_key, &mut left_using_columns)?;
1415                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1416                    left_key,
1417                    &[&[self.plan.schema()]],
1418                    &[],
1419                )?;
1420
1421                let mut right_using_columns = HashSet::new();
1422                expr_to_columns(&right_key, &mut right_using_columns)?;
1423                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1424                    right_key,
1425                    &[&[right.schema()]],
1426                    &[],
1427                )?;
1428
1429                // find valid equijoin
1430                find_valid_equijoin_key_pair(
1431                        &normalized_left_key,
1432                        &normalized_right_key,
1433                        self.plan.schema(),
1434                        right.schema(),
1435                    )?.ok_or_else(||
1436                        plan_datafusion_err!(
1437                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1438                        ))
1439            })
1440            .collect::<Result<Vec<_>>>()?;
1441
1442        let join = Join::try_new(
1443            self.plan,
1444            Arc::new(right),
1445            join_key_pairs,
1446            filter,
1447            join_type,
1448            JoinConstraint::On,
1449            NullEquality::NullEqualsNothing,
1450        )?;
1451
1452        Ok(Self::new(LogicalPlan::Join(join)))
1453    }
1454
1455    /// Unnest the given column.
1456    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1457        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1458    }
1459
1460    /// Unnest the given column given [`UnnestOptions`]
1461    pub fn unnest_column_with_options(
1462        self,
1463        column: impl Into<Column>,
1464        options: UnnestOptions,
1465    ) -> Result<Self> {
1466        unnest_with_options(
1467            Arc::unwrap_or_clone(self.plan),
1468            vec![column.into()],
1469            options,
1470        )
1471        .map(Self::new)
1472    }
1473
1474    /// Unnest the given columns with the given [`UnnestOptions`]
1475    pub fn unnest_columns_with_options(
1476        self,
1477        columns: Vec<Column>,
1478        options: UnnestOptions,
1479    ) -> Result<Self> {
1480        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1481            .map(Self::new)
1482    }
1483}
1484
1485impl From<LogicalPlan> for LogicalPlanBuilder {
1486    fn from(plan: LogicalPlan) -> Self {
1487        LogicalPlanBuilder::new(plan)
1488    }
1489}
1490
1491impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1492    fn from(plan: Arc<LogicalPlan>) -> Self {
1493        LogicalPlanBuilder::new_from_arc(plan)
1494    }
1495}
1496
1497/// Container used when building fields for a `VALUES` node.
1498#[derive(Default)]
1499struct ValuesFields {
1500    inner: Vec<Field>,
1501}
1502
1503impl ValuesFields {
1504    pub fn new() -> Self {
1505        Self::default()
1506    }
1507
1508    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1509        // Naming follows the convention described here:
1510        // https://www.postgresql.org/docs/current/queries-values.html
1511        let name = format!("column{}", self.inner.len() + 1);
1512        self.inner.push(Field::new(name, data_type, nullable));
1513    }
1514
1515    pub fn into_fields(self) -> Fields {
1516        self.inner.into()
1517    }
1518}
1519
1520// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521//
1522// Some field names might already come to this function with the count (number of times it appeared)
1523// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
1524// if these three fields passed to this function: "col:1", "col" and "col", the function
1525// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526// that's why we need the `seen` set, so the fields are always unique.
1527//
1528pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529    let mut name_map = HashMap::new();
1530    let mut seen: HashSet<String> = HashSet::new();
1531
1532    fields
1533        .into_iter()
1534        .map(|field| {
1535            let base_name = field.name();
1536            let count = name_map.entry(base_name.clone()).or_insert(0);
1537            let mut new_name = base_name.clone();
1538
1539            // Loop until we find a name that hasn't been used
1540            while seen.contains(&new_name) {
1541                *count += 1;
1542                new_name = format!("{base_name}:{count}");
1543            }
1544
1545            seen.insert(new_name.clone());
1546
1547            let mut modified_field =
1548                Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549            modified_field.set_metadata(field.metadata().clone());
1550            modified_field
1551        })
1552        .collect()
1553}
1554
1555fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1556    let mut table_references = schema
1557        .iter()
1558        .filter_map(|(qualifier, _)| qualifier)
1559        .collect::<Vec<_>>();
1560    table_references.dedup();
1561    let table_reference = if table_references.len() == 1 {
1562        table_references.pop().cloned()
1563    } else {
1564        None
1565    };
1566
1567    (
1568        table_reference,
1569        Arc::new(Field::new("mark", DataType::Boolean, false)),
1570    )
1571}
1572
1573/// Creates a schema for a join operation.
1574/// The fields from the left side are first
1575pub fn build_join_schema(
1576    left: &DFSchema,
1577    right: &DFSchema,
1578    join_type: &JoinType,
1579) -> Result<DFSchema> {
1580    fn nullify_fields<'a>(
1581        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1582    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1583        fields
1584            .map(|(q, f)| {
1585                // TODO: find a good way to do that
1586                let field = f.as_ref().clone().with_nullable(true);
1587                (q.cloned(), Arc::new(field))
1588            })
1589            .collect()
1590    }
1591
1592    let right_fields = right.iter();
1593    let left_fields = left.iter();
1594
1595    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1596        JoinType::Inner => {
1597            // left then right
1598            let left_fields = left_fields
1599                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1600                .collect::<Vec<_>>();
1601            let right_fields = right_fields
1602                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1603                .collect::<Vec<_>>();
1604            left_fields.into_iter().chain(right_fields).collect()
1605        }
1606        JoinType::Left => {
1607            // left then right, right set to nullable in case of not matched scenario
1608            let left_fields = left_fields
1609                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1610                .collect::<Vec<_>>();
1611            left_fields
1612                .into_iter()
1613                .chain(nullify_fields(right_fields))
1614                .collect()
1615        }
1616        JoinType::Right => {
1617            // left then right, left set to nullable in case of not matched scenario
1618            let right_fields = right_fields
1619                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1620                .collect::<Vec<_>>();
1621            nullify_fields(left_fields)
1622                .into_iter()
1623                .chain(right_fields)
1624                .collect()
1625        }
1626        JoinType::Full => {
1627            // left then right, all set to nullable in case of not matched scenario
1628            nullify_fields(left_fields)
1629                .into_iter()
1630                .chain(nullify_fields(right_fields))
1631                .collect()
1632        }
1633        JoinType::LeftSemi | JoinType::LeftAnti => {
1634            // Only use the left side for the schema
1635            left_fields
1636                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1637                .collect()
1638        }
1639        JoinType::LeftMark => left_fields
1640            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1641            .chain(once(mark_field(right)))
1642            .collect(),
1643        JoinType::RightSemi | JoinType::RightAnti => {
1644            // Only use the right side for the schema
1645            right_fields
1646                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1647                .collect()
1648        }
1649        JoinType::RightMark => right_fields
1650            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1651            .chain(once(mark_field(left)))
1652            .collect(),
1653    };
1654    let func_dependencies = left.functional_dependencies().join(
1655        right.functional_dependencies(),
1656        join_type,
1657        left.fields().len(),
1658    );
1659
1660    let (schema1, schema2) = match join_type {
1661        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1662        _ => (right, left),
1663    };
1664
1665    let metadata = schema1
1666        .metadata()
1667        .clone()
1668        .into_iter()
1669        .chain(schema2.metadata().clone())
1670        .collect();
1671
1672    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1673    dfschema.with_functional_dependencies(func_dependencies)
1674}
1675
1676/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1677/// conflict with the columns from the other.
1678/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1679/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1680/// places (see e.g. DFSchema::check_names).
1681/// The function returns:
1682/// - The requalified or original left logical plan
1683/// - The requalified or original right logical plan
1684/// - If a requalification was needed or not
1685pub fn requalify_sides_if_needed(
1686    left: LogicalPlanBuilder,
1687    right: LogicalPlanBuilder,
1688) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1689    let left_cols = left.schema().columns();
1690    let right_cols = right.schema().columns();
1691    if left_cols.iter().any(|l| {
1692        right_cols.iter().any(|r| {
1693            l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1694        })
1695    }) {
1696        // These names have no connection to the original plan, but they'll make the columns
1697        // (mostly) unique.
1698        Ok((
1699            left.alias(TableReference::bare("left"))?,
1700            right.alias(TableReference::bare("right"))?,
1701            true,
1702        ))
1703    } else {
1704        Ok((left, right, false))
1705    }
1706}
1707
1708/// Add additional "synthetic" group by expressions based on functional
1709/// dependencies.
1710///
1711/// For example, if we are grouping on `[c1]`, and we know from
1712/// functional dependencies that column `c1` determines `c2`, this function
1713/// adds `c2` to the group by list.
1714///
1715/// This allows MySQL style selects like
1716/// `SELECT col FROM t WHERE pk = 5` if col is unique
1717pub fn add_group_by_exprs_from_dependencies(
1718    mut group_expr: Vec<Expr>,
1719    schema: &DFSchemaRef,
1720) -> Result<Vec<Expr>> {
1721    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1722    // c1 + 1` produces an output field named `"c1 + 1"`
1723    let mut group_by_field_names = group_expr
1724        .iter()
1725        .map(|e| e.schema_name().to_string())
1726        .collect::<Vec<_>>();
1727
1728    if let Some(target_indices) =
1729        get_target_functional_dependencies(schema, &group_by_field_names)
1730    {
1731        for idx in target_indices {
1732            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1733            let expr_name = expr.schema_name().to_string();
1734            if !group_by_field_names.contains(&expr_name) {
1735                group_by_field_names.push(expr_name);
1736                group_expr.push(expr);
1737            }
1738        }
1739    }
1740    Ok(group_expr)
1741}
1742
1743/// Errors if one or more expressions have equal names.
1744pub fn validate_unique_names<'a>(
1745    node_name: &str,
1746    expressions: impl IntoIterator<Item = &'a Expr>,
1747) -> Result<()> {
1748    let mut unique_names = HashMap::new();
1749
1750    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1751        let name = expr.schema_name().to_string();
1752        match unique_names.get(&name) {
1753            None => {
1754                unique_names.insert(name, (position, expr));
1755                Ok(())
1756            },
1757            Some((existing_position, existing_expr)) => {
1758                plan_err!("{node_name} require unique expression names \
1759                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1760                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1761                            )
1762            }
1763        }
1764    })
1765}
1766
1767/// Union two [`LogicalPlan`]s.
1768///
1769/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1770/// subtree expressions will not be properly typed until the optimizer pass.
1771///
1772/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1773/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1774/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1775///
1776/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1777/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1778pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1779    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1780        Arc::new(left_plan),
1781        Arc::new(right_plan),
1782    ])?))
1783}
1784
1785/// Like [`union`], but combine rows from different tables by name, rather than
1786/// by position.
1787pub fn union_by_name(
1788    left_plan: LogicalPlan,
1789    right_plan: LogicalPlan,
1790) -> Result<LogicalPlan> {
1791    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1792        Arc::new(left_plan),
1793        Arc::new(right_plan),
1794    ])?))
1795}
1796
1797/// Create Projection
1798/// # Errors
1799/// This function errors under any of the following conditions:
1800/// * Two or more expressions have the same name
1801/// * An invalid expression is used (e.g. a `sort` expression)
1802pub fn project(
1803    plan: LogicalPlan,
1804    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1805) -> Result<LogicalPlan> {
1806    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1807}
1808
1809/// Create Projection. Similar to project except that the expressions
1810/// passed in have a flag to indicate if that expression requires
1811/// validation (normalize & columnize) (true) or not (false)
1812/// # Errors
1813/// This function errors under any of the following conditions:
1814/// * Two or more expressions have the same name
1815/// * An invalid expression is used (e.g. a `sort` expression)
1816fn project_with_validation(
1817    plan: LogicalPlan,
1818    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1819) -> Result<LogicalPlan> {
1820    let mut projected_expr = vec![];
1821    for (e, validate) in expr {
1822        let e = e.into();
1823        match e {
1824            SelectExpr::Wildcard(opt) => {
1825                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1826
1827                // If there is a REPLACE statement, replace that column with the given
1828                // replace expression. Column name remains the same.
1829                let expanded = if let Some(replace) = opt.replace {
1830                    replace_columns(expanded, &replace)?
1831                } else {
1832                    expanded
1833                };
1834
1835                for e in expanded {
1836                    if validate {
1837                        projected_expr
1838                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1839                    } else {
1840                        projected_expr.push(e)
1841                    }
1842                }
1843            }
1844            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1845                let expanded =
1846                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1847
1848                // If there is a REPLACE statement, replace that column with the given
1849                // replace expression. Column name remains the same.
1850                let expanded = if let Some(replace) = opt.replace {
1851                    replace_columns(expanded, &replace)?
1852                } else {
1853                    expanded
1854                };
1855
1856                for e in expanded {
1857                    if validate {
1858                        projected_expr
1859                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1860                    } else {
1861                        projected_expr.push(e)
1862                    }
1863                }
1864            }
1865            SelectExpr::Expression(e) => {
1866                if validate {
1867                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1868                } else {
1869                    projected_expr.push(e)
1870                }
1871            }
1872        }
1873    }
1874    validate_unique_names("Projections", projected_expr.iter())?;
1875
1876    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1877}
1878
1879/// If there is a REPLACE statement in the projected expression in the form of
1880/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1881/// that column with the given replace expression. Column name remains the same.
1882/// Multiple REPLACEs are also possible with comma separations.
1883fn replace_columns(
1884    mut exprs: Vec<Expr>,
1885    replace: &PlannedReplaceSelectItem,
1886) -> Result<Vec<Expr>> {
1887    for expr in exprs.iter_mut() {
1888        if let Expr::Column(Column { name, .. }) = expr {
1889            if let Some((_, new_expr)) = replace
1890                .items()
1891                .iter()
1892                .zip(replace.expressions().iter())
1893                .find(|(item, _)| item.column_name.value == *name)
1894            {
1895                *expr = new_expr.clone().alias(name.clone())
1896            }
1897        }
1898    }
1899    Ok(exprs)
1900}
1901
1902/// Create a SubqueryAlias to wrap a LogicalPlan.
1903pub fn subquery_alias(
1904    plan: LogicalPlan,
1905    alias: impl Into<TableReference>,
1906) -> Result<LogicalPlan> {
1907    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1908}
1909
1910/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1911/// This is mostly used for testing and documentation.
1912pub fn table_scan(
1913    name: Option<impl Into<TableReference>>,
1914    table_schema: &Schema,
1915    projection: Option<Vec<usize>>,
1916) -> Result<LogicalPlanBuilder> {
1917    table_scan_with_filters(name, table_schema, projection, vec![])
1918}
1919
1920/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1921/// and inlined filters.
1922/// This is mostly used for testing and documentation.
1923pub fn table_scan_with_filters(
1924    name: Option<impl Into<TableReference>>,
1925    table_schema: &Schema,
1926    projection: Option<Vec<usize>>,
1927    filters: Vec<Expr>,
1928) -> Result<LogicalPlanBuilder> {
1929    let table_source = table_source(table_schema);
1930    let name = name
1931        .map(|n| n.into())
1932        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1933    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1934}
1935
1936/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1937/// filters, and inlined fetch.
1938/// This is mostly used for testing and documentation.
1939pub fn table_scan_with_filter_and_fetch(
1940    name: Option<impl Into<TableReference>>,
1941    table_schema: &Schema,
1942    projection: Option<Vec<usize>>,
1943    filters: Vec<Expr>,
1944    fetch: Option<usize>,
1945) -> Result<LogicalPlanBuilder> {
1946    let table_source = table_source(table_schema);
1947    let name = name
1948        .map(|n| n.into())
1949        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1950    LogicalPlanBuilder::scan_with_filters_fetch(
1951        name,
1952        table_source,
1953        projection,
1954        filters,
1955        fetch,
1956    )
1957}
1958
1959pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1960    let table_schema = Arc::new(table_schema.clone());
1961    Arc::new(LogicalTableSource {
1962        table_schema,
1963        constraints: Default::default(),
1964    })
1965}
1966
1967pub fn table_source_with_constraints(
1968    table_schema: &Schema,
1969    constraints: Constraints,
1970) -> Arc<dyn TableSource> {
1971    let table_schema = Arc::new(table_schema.clone());
1972    Arc::new(LogicalTableSource {
1973        table_schema,
1974        constraints,
1975    })
1976}
1977
1978/// Wrap projection for a plan, if the join keys contains normal expression.
1979pub fn wrap_projection_for_join_if_necessary(
1980    join_keys: &[Expr],
1981    input: LogicalPlan,
1982) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1983    let input_schema = input.schema();
1984    let alias_join_keys: Vec<Expr> = join_keys
1985        .iter()
1986        .map(|key| {
1987            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
1988            // If we do not add alias, it will throw same field name error in the schema when adding projection.
1989            // For example:
1990            //    input scan : [a, b, c],
1991            //    join keys: [cast(a as int)]
1992            //
1993            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
1994            //  https://github.com/apache/datafusion/issues/4478
1995            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1996                let alias = format!("{key}");
1997                key.clone().alias(alias)
1998            } else {
1999                key.clone()
2000            }
2001        })
2002        .collect::<Vec<_>>();
2003
2004    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2005    let plan = if need_project {
2006        // Include all columns from the input and extend them with the join keys
2007        let mut projection = input_schema
2008            .columns()
2009            .into_iter()
2010            .map(Expr::Column)
2011            .collect::<Vec<_>>();
2012        let join_key_items = alias_join_keys
2013            .iter()
2014            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2015            .cloned()
2016            .collect::<HashSet<Expr>>();
2017        projection.extend(join_key_items);
2018
2019        LogicalPlanBuilder::from(input)
2020            .project(projection.into_iter().map(SelectExpr::from))?
2021            .build()?
2022    } else {
2023        input
2024    };
2025
2026    let join_on = alias_join_keys
2027        .into_iter()
2028        .map(|key| {
2029            if let Some(col) = key.try_as_col() {
2030                Ok(col.clone())
2031            } else {
2032                let name = key.schema_name().to_string();
2033                Ok(Column::from_name(name))
2034            }
2035        })
2036        .collect::<Result<Vec<_>>>()?;
2037
2038    Ok((plan, join_on, need_project))
2039}
2040
2041/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2042/// that users will provide their own TableSource implementations or use DataFusion's
2043/// DefaultTableSource.
2044pub struct LogicalTableSource {
2045    table_schema: SchemaRef,
2046    constraints: Constraints,
2047}
2048
2049impl LogicalTableSource {
2050    /// Create a new LogicalTableSource
2051    pub fn new(table_schema: SchemaRef) -> Self {
2052        Self {
2053            table_schema,
2054            constraints: Constraints::default(),
2055        }
2056    }
2057
2058    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2059        self.constraints = constraints;
2060        self
2061    }
2062}
2063
2064impl TableSource for LogicalTableSource {
2065    fn as_any(&self) -> &dyn Any {
2066        self
2067    }
2068
2069    fn schema(&self) -> SchemaRef {
2070        Arc::clone(&self.table_schema)
2071    }
2072
2073    fn constraints(&self) -> Option<&Constraints> {
2074        Some(&self.constraints)
2075    }
2076
2077    fn supports_filters_pushdown(
2078        &self,
2079        filters: &[&Expr],
2080    ) -> Result<Vec<TableProviderFilterPushDown>> {
2081        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2082    }
2083}
2084
2085/// Create a [`LogicalPlan::Unnest`] plan
2086pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2087    unnest_with_options(input, columns, UnnestOptions::default())
2088}
2089
2090pub fn get_struct_unnested_columns(
2091    col_name: &String,
2092    inner_fields: &Fields,
2093) -> Vec<Column> {
2094    inner_fields
2095        .iter()
2096        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2097        .collect()
2098}
2099
2100/// Create a [`LogicalPlan::Unnest`] plan with options
2101/// This function receive a list of columns to be unnested
2102/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2103/// The new schema will contains post-unnest fields replacing the original field
2104///
2105/// For example:
2106/// Input schema as
2107/// ```text
2108/// +---------------------+-------------------+
2109/// | col1                | col2              |
2110/// +---------------------+-------------------+
2111/// | Struct(INT64,INT32) | List(List(Int64)) |
2112/// +---------------------+-------------------+
2113/// ```
2114///
2115///
2116///
2117/// Then unnesting columns with:
2118/// - (col1,Struct)
2119/// - (col2,List(\[depth=1,depth=2\]))
2120///
2121/// will generate a new schema as
2122/// ```text
2123/// +---------+---------+---------------------+---------------------+
2124/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2125/// +---------+---------+---------------------+---------------------+
2126/// | Int64   | Int32   | List(Int64)         |  Int64              |
2127/// +---------+---------+---------------------+---------------------+
2128/// ```
2129pub fn unnest_with_options(
2130    input: LogicalPlan,
2131    columns_to_unnest: Vec<Column>,
2132    options: UnnestOptions,
2133) -> Result<LogicalPlan> {
2134    Ok(LogicalPlan::Unnest(Unnest::try_new(
2135        Arc::new(input),
2136        columns_to_unnest,
2137        options,
2138    )?))
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143    use super::*;
2144    use crate::logical_plan::StringifiedPlan;
2145    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2146
2147    use crate::test::function_stub::sum;
2148    use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2149    use insta::assert_snapshot;
2150
2151    #[test]
2152    fn plan_builder_simple() -> Result<()> {
2153        let plan =
2154            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2155                .filter(col("state").eq(lit("CO")))?
2156                .project(vec![col("id")])?
2157                .build()?;
2158
2159        assert_snapshot!(plan, @r#"
2160        Projection: employee_csv.id
2161          Filter: employee_csv.state = Utf8("CO")
2162            TableScan: employee_csv projection=[id, state]
2163        "#);
2164
2165        Ok(())
2166    }
2167
2168    #[test]
2169    fn plan_builder_schema() {
2170        let schema = employee_schema();
2171        let projection = None;
2172        let plan =
2173            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2174                .unwrap();
2175        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2176
2177        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2178        // (and thus normalized to "employee"csv") as well
2179        let projection = None;
2180        let plan =
2181            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2182                .unwrap();
2183        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2184    }
2185
2186    #[test]
2187    fn plan_builder_empty_name() {
2188        let schema = employee_schema();
2189        let projection = None;
2190        let err =
2191            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2192        assert_snapshot!(
2193            err.strip_backtrace(),
2194            @"Error during planning: table_name cannot be empty"
2195        );
2196    }
2197
2198    #[test]
2199    fn plan_builder_sort() -> Result<()> {
2200        let plan =
2201            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2202                .sort(vec![
2203                    expr::Sort::new(col("state"), true, true),
2204                    expr::Sort::new(col("salary"), false, false),
2205                ])?
2206                .build()?;
2207
2208        assert_snapshot!(plan, @r"
2209        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2210          TableScan: employee_csv projection=[state, salary]
2211        ");
2212
2213        Ok(())
2214    }
2215
2216    #[test]
2217    fn plan_builder_union() -> Result<()> {
2218        let plan =
2219            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2220
2221        let plan = plan
2222            .clone()
2223            .union(plan.clone().build()?)?
2224            .union(plan.clone().build()?)?
2225            .union(plan.build()?)?
2226            .build()?;
2227
2228        assert_snapshot!(plan, @r"
2229        Union
2230          Union
2231            Union
2232              TableScan: employee_csv projection=[state, salary]
2233              TableScan: employee_csv projection=[state, salary]
2234            TableScan: employee_csv projection=[state, salary]
2235          TableScan: employee_csv projection=[state, salary]
2236        ");
2237
2238        Ok(())
2239    }
2240
2241    #[test]
2242    fn plan_builder_union_distinct() -> Result<()> {
2243        let plan =
2244            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2245
2246        let plan = plan
2247            .clone()
2248            .union_distinct(plan.clone().build()?)?
2249            .union_distinct(plan.clone().build()?)?
2250            .union_distinct(plan.build()?)?
2251            .build()?;
2252
2253        assert_snapshot!(plan, @r"
2254        Distinct:
2255          Union
2256            Distinct:
2257              Union
2258                Distinct:
2259                  Union
2260                    TableScan: employee_csv projection=[state, salary]
2261                    TableScan: employee_csv projection=[state, salary]
2262                TableScan: employee_csv projection=[state, salary]
2263            TableScan: employee_csv projection=[state, salary]
2264        ");
2265
2266        Ok(())
2267    }
2268
2269    #[test]
2270    fn plan_builder_simple_distinct() -> Result<()> {
2271        let plan =
2272            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2273                .filter(col("state").eq(lit("CO")))?
2274                .project(vec![col("id")])?
2275                .distinct()?
2276                .build()?;
2277
2278        assert_snapshot!(plan, @r#"
2279        Distinct:
2280          Projection: employee_csv.id
2281            Filter: employee_csv.state = Utf8("CO")
2282              TableScan: employee_csv projection=[id, state]
2283        "#);
2284
2285        Ok(())
2286    }
2287
2288    #[test]
2289    fn exists_subquery() -> Result<()> {
2290        let foo = test_table_scan_with_name("foo")?;
2291        let bar = test_table_scan_with_name("bar")?;
2292
2293        let subquery = LogicalPlanBuilder::from(foo)
2294            .project(vec![col("a")])?
2295            .filter(col("a").eq(col("bar.a")))?
2296            .build()?;
2297
2298        let outer_query = LogicalPlanBuilder::from(bar)
2299            .project(vec![col("a")])?
2300            .filter(exists(Arc::new(subquery)))?
2301            .build()?;
2302
2303        assert_snapshot!(outer_query, @r"
2304        Filter: EXISTS (<subquery>)
2305          Subquery:
2306            Filter: foo.a = bar.a
2307              Projection: foo.a
2308                TableScan: foo
2309          Projection: bar.a
2310            TableScan: bar
2311        ");
2312
2313        Ok(())
2314    }
2315
2316    #[test]
2317    fn filter_in_subquery() -> Result<()> {
2318        let foo = test_table_scan_with_name("foo")?;
2319        let bar = test_table_scan_with_name("bar")?;
2320
2321        let subquery = LogicalPlanBuilder::from(foo)
2322            .project(vec![col("a")])?
2323            .filter(col("a").eq(col("bar.a")))?
2324            .build()?;
2325
2326        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2327        let outer_query = LogicalPlanBuilder::from(bar)
2328            .project(vec![col("a")])?
2329            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2330            .build()?;
2331
2332        assert_snapshot!(outer_query, @r"
2333        Filter: bar.a IN (<subquery>)
2334          Subquery:
2335            Filter: foo.a = bar.a
2336              Projection: foo.a
2337                TableScan: foo
2338          Projection: bar.a
2339            TableScan: bar
2340        ");
2341
2342        Ok(())
2343    }
2344
2345    #[test]
2346    fn select_scalar_subquery() -> Result<()> {
2347        let foo = test_table_scan_with_name("foo")?;
2348        let bar = test_table_scan_with_name("bar")?;
2349
2350        let subquery = LogicalPlanBuilder::from(foo)
2351            .project(vec![col("b")])?
2352            .filter(col("a").eq(col("bar.a")))?
2353            .build()?;
2354
2355        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2356        let outer_query = LogicalPlanBuilder::from(bar)
2357            .project(vec![scalar_subquery(Arc::new(subquery))])?
2358            .build()?;
2359
2360        assert_snapshot!(outer_query, @r"
2361        Projection: (<subquery>)
2362          Subquery:
2363            Filter: foo.a = bar.a
2364              Projection: foo.b
2365                TableScan: foo
2366          TableScan: bar
2367        ");
2368
2369        Ok(())
2370    }
2371
2372    #[test]
2373    fn projection_non_unique_names() -> Result<()> {
2374        let plan = table_scan(
2375            Some("employee_csv"),
2376            &employee_schema(),
2377            // project id and first_name by column index
2378            Some(vec![0, 1]),
2379        )?
2380        // two columns with the same name => error
2381        .project(vec![col("id"), col("first_name").alias("id")]);
2382
2383        match plan {
2384            Err(DataFusionError::SchemaError(err, _)) => {
2385                if let SchemaError::AmbiguousReference { field } = *err {
2386                    let Column {
2387                        relation,
2388                        name,
2389                        spans: _,
2390                    } = *field;
2391                    let Some(TableReference::Bare { table }) = relation else {
2392                        return plan_err!(
2393                            "wrong relation: {relation:?}, expected table name"
2394                        );
2395                    };
2396                    assert_eq!(*"employee_csv", *table);
2397                    assert_eq!("id", &name);
2398                    Ok(())
2399                } else {
2400                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2401                }
2402            }
2403            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2404        }
2405    }
2406
2407    fn employee_schema() -> Schema {
2408        Schema::new(vec![
2409            Field::new("id", DataType::Int32, false),
2410            Field::new("first_name", DataType::Utf8, false),
2411            Field::new("last_name", DataType::Utf8, false),
2412            Field::new("state", DataType::Utf8, false),
2413            Field::new("salary", DataType::Int32, false),
2414        ])
2415    }
2416
2417    #[test]
2418    fn stringified_plan() {
2419        let stringified_plan =
2420            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2421        assert!(stringified_plan.should_display(true));
2422        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2423
2424        let stringified_plan =
2425            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2426        assert!(stringified_plan.should_display(true));
2427        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2428
2429        let stringified_plan =
2430            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2431        assert!(stringified_plan.should_display(true));
2432        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2433
2434        let stringified_plan =
2435            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2436        assert!(stringified_plan.should_display(true));
2437        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2438
2439        let stringified_plan = StringifiedPlan::new(
2440            PlanType::OptimizedLogicalPlan {
2441                optimizer_name: "random opt pass".into(),
2442            },
2443            "...the plan...",
2444        );
2445        assert!(stringified_plan.should_display(true));
2446        assert!(!stringified_plan.should_display(false));
2447    }
2448
2449    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2450        let schema = Schema::new(vec![
2451            Field::new("a", DataType::UInt32, false),
2452            Field::new("b", DataType::UInt32, false),
2453            Field::new("c", DataType::UInt32, false),
2454        ]);
2455        table_scan(Some(name), &schema, None)?.build()
2456    }
2457
2458    #[test]
2459    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2460        let plan1 =
2461            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2462        let plan2 =
2463            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2464
2465        let err_msg1 =
2466            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2467                .unwrap_err();
2468
2469        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2470
2471        Ok(())
2472    }
2473
2474    #[test]
2475    fn plan_builder_unnest() -> Result<()> {
2476        // Cannot unnest on a scalar column
2477        let err = nested_table_scan("test_table")?
2478            .unnest_column("scalar")
2479            .unwrap_err();
2480
2481        let DataFusionError::Internal(desc) = err else {
2482            return plan_err!("Plan should have returned an DataFusionError::Internal");
2483        };
2484
2485        let desc = desc
2486            .split(DataFusionError::BACK_TRACE_SEP)
2487            .collect::<Vec<&str>>()
2488            .first()
2489            .unwrap_or(&"")
2490            .to_string();
2491
2492        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2493
2494        // Unnesting the strings list.
2495        let plan = nested_table_scan("test_table")?
2496            .unnest_column("strings")?
2497            .build()?;
2498
2499        assert_snapshot!(plan, @r"
2500        Unnest: lists[test_table.strings|depth=1] structs[]
2501          TableScan: test_table
2502        ");
2503
2504        // Check unnested field is a scalar
2505        let field = plan.schema().field_with_name(None, "strings").unwrap();
2506        assert_eq!(&DataType::Utf8, field.data_type());
2507
2508        // Unnesting the singular struct column result into 2 new columns for each subfield
2509        let plan = nested_table_scan("test_table")?
2510            .unnest_column("struct_singular")?
2511            .build()?;
2512
2513        assert_snapshot!(plan, @r"
2514        Unnest: lists[] structs[test_table.struct_singular]
2515          TableScan: test_table
2516        ");
2517
2518        for field_name in &["a", "b"] {
2519            // Check unnested struct field is a scalar
2520            let field = plan
2521                .schema()
2522                .field_with_name(None, &format!("struct_singular.{field_name}"))
2523                .unwrap();
2524            assert_eq!(&DataType::UInt32, field.data_type());
2525        }
2526
2527        // Unnesting multiple fields in separate plans
2528        let plan = nested_table_scan("test_table")?
2529            .unnest_column("strings")?
2530            .unnest_column("structs")?
2531            .unnest_column("struct_singular")?
2532            .build()?;
2533
2534        assert_snapshot!(plan, @r"
2535        Unnest: lists[] structs[test_table.struct_singular]
2536          Unnest: lists[test_table.structs|depth=1] structs[]
2537            Unnest: lists[test_table.strings|depth=1] structs[]
2538              TableScan: test_table
2539        ");
2540
2541        // Check unnested struct list field should be a struct.
2542        let field = plan.schema().field_with_name(None, "structs").unwrap();
2543        assert!(matches!(field.data_type(), DataType::Struct(_)));
2544
2545        // Unnesting multiple fields at the same time, using infer syntax
2546        let cols = vec!["strings", "structs", "struct_singular"]
2547            .into_iter()
2548            .map(|c| c.into())
2549            .collect();
2550
2551        let plan = nested_table_scan("test_table")?
2552            .unnest_columns_with_options(cols, UnnestOptions::default())?
2553            .build()?;
2554
2555        assert_snapshot!(plan, @r"
2556        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2557          TableScan: test_table
2558        ");
2559
2560        // Unnesting missing column should fail.
2561        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2562        assert!(plan.is_err());
2563
2564        // Simultaneously unnesting a list (with different depth) and a struct column
2565        let plan = nested_table_scan("test_table")?
2566            .unnest_columns_with_options(
2567                vec!["stringss".into(), "struct_singular".into()],
2568                UnnestOptions::default()
2569                    .with_recursions(RecursionUnnestOption {
2570                        input_column: "stringss".into(),
2571                        output_column: "stringss_depth_1".into(),
2572                        depth: 1,
2573                    })
2574                    .with_recursions(RecursionUnnestOption {
2575                        input_column: "stringss".into(),
2576                        output_column: "stringss_depth_2".into(),
2577                        depth: 2,
2578                    }),
2579            )?
2580            .build()?;
2581
2582        assert_snapshot!(plan, @r"
2583        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2584          TableScan: test_table
2585        ");
2586
2587        // Check output columns has correct type
2588        let field = plan
2589            .schema()
2590            .field_with_name(None, "stringss_depth_1")
2591            .unwrap();
2592        assert_eq!(
2593            &DataType::new_list(DataType::Utf8, false),
2594            field.data_type()
2595        );
2596        let field = plan
2597            .schema()
2598            .field_with_name(None, "stringss_depth_2")
2599            .unwrap();
2600        assert_eq!(&DataType::Utf8, field.data_type());
2601        // unnesting struct is still correct
2602        for field_name in &["a", "b"] {
2603            let field = plan
2604                .schema()
2605                .field_with_name(None, &format!("struct_singular.{field_name}"))
2606                .unwrap();
2607            assert_eq!(&DataType::UInt32, field.data_type());
2608        }
2609
2610        Ok(())
2611    }
2612
2613    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2614        // Create a schema with a scalar field, a list of strings, a list of structs
2615        // and a singular struct
2616        let struct_field_in_list = Field::new_struct(
2617            "item",
2618            vec![
2619                Field::new("a", DataType::UInt32, false),
2620                Field::new("b", DataType::UInt32, false),
2621            ],
2622            false,
2623        );
2624        let string_field = Field::new_list_field(DataType::Utf8, false);
2625        let strings_field = Field::new_list("item", string_field.clone(), false);
2626        let schema = Schema::new(vec![
2627            Field::new("scalar", DataType::UInt32, false),
2628            Field::new_list("strings", string_field, false),
2629            Field::new_list("structs", struct_field_in_list, false),
2630            Field::new(
2631                "struct_singular",
2632                DataType::Struct(Fields::from(vec![
2633                    Field::new("a", DataType::UInt32, false),
2634                    Field::new("b", DataType::UInt32, false),
2635                ])),
2636                false,
2637            ),
2638            Field::new_list("stringss", strings_field, false),
2639        ]);
2640
2641        table_scan(Some(table_name), &schema, None)
2642    }
2643
2644    #[test]
2645    fn test_union_after_join() -> Result<()> {
2646        let values = vec![vec![lit(1)]];
2647
2648        let left = LogicalPlanBuilder::values(values.clone())?
2649            .alias("left")?
2650            .build()?;
2651        let right = LogicalPlanBuilder::values(values)?
2652            .alias("right")?
2653            .build()?;
2654
2655        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2656
2657        let plan = LogicalPlanBuilder::from(join.clone())
2658            .union(join)?
2659            .build()?;
2660
2661        assert_snapshot!(plan, @r"
2662        Union
2663          Cross Join: 
2664            SubqueryAlias: left
2665              Values: (Int32(1))
2666            SubqueryAlias: right
2667              Values: (Int32(1))
2668          Cross Join: 
2669            SubqueryAlias: left
2670              Values: (Int32(1))
2671            SubqueryAlias: right
2672              Values: (Int32(1))
2673        ");
2674
2675        Ok(())
2676    }
2677
2678    #[test]
2679    fn test_change_redundant_column() -> Result<()> {
2680        let t1_field_1 = Field::new("a", DataType::Int32, false);
2681        let t2_field_1 = Field::new("a", DataType::Int32, false);
2682        let t2_field_3 = Field::new("a", DataType::Int32, false);
2683        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684        let t1_field_2 = Field::new("b", DataType::Int32, false);
2685        let t2_field_2 = Field::new("b", DataType::Int32, false);
2686
2687        let field_vec = vec![
2688            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689        ];
2690        let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691
2692        assert_eq!(
2693            remove_redundant,
2694            vec![
2695                Field::new("a", DataType::Int32, false),
2696                Field::new("a:1", DataType::Int32, false),
2697                Field::new("b", DataType::Int32, false),
2698                Field::new("b:1", DataType::Int32, false),
2699                Field::new("a:2", DataType::Int32, false),
2700                Field::new("a:1:1", DataType::Int32, false),
2701            ]
2702        );
2703        Ok(())
2704    }
2705
2706    #[test]
2707    fn plan_builder_from_logical_plan() -> Result<()> {
2708        let plan =
2709            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2710                .sort(vec![
2711                    expr::Sort::new(col("state"), true, true),
2712                    expr::Sort::new(col("salary"), false, false),
2713                ])?
2714                .build()?;
2715
2716        let plan_expected = format!("{plan}");
2717        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2718        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2719
2720        Ok(())
2721    }
2722
2723    #[test]
2724    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2725        let constraints =
2726            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2727        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2728
2729        let plan =
2730            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2731                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2732                .build()?;
2733
2734        assert_snapshot!(plan, @r"
2735        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2736          TableScan: employee_csv projection=[id, state, salary]
2737        ");
2738
2739        Ok(())
2740    }
2741
2742    #[test]
2743    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2744        let constraints =
2745            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2746        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2747
2748        let options =
2749            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2750        let plan =
2751            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2752                .with_options(options)
2753                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2754                .build()?;
2755
2756        assert_snapshot!(plan, @r"
2757        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2758          TableScan: employee_csv projection=[id, state, salary]
2759        ");
2760
2761        Ok(())
2762    }
2763
2764    #[test]
2765    fn test_join_metadata() -> Result<()> {
2766        let left_schema = DFSchema::new_with_metadata(
2767            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2768            HashMap::from([("key".to_string(), "left".to_string())]),
2769        )?;
2770        let right_schema = DFSchema::new_with_metadata(
2771            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2772            HashMap::from([("key".to_string(), "right".to_string())]),
2773        )?;
2774
2775        let join_schema =
2776            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2777        assert_eq!(
2778            join_schema.metadata(),
2779            &HashMap::from([("key".to_string(), "left".to_string())])
2780        );
2781        let join_schema =
2782            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2783        assert_eq!(
2784            join_schema.metadata(),
2785            &HashMap::from([("key".to_string(), "right".to_string())])
2786        );
2787
2788        Ok(())
2789    }
2790}