datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29    coerce_plan_expr_for_schema, normalize_col,
30    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31    rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37    Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43    group_window_expr_by_sort_keys,
44};
45use crate::{
46    and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
47    Statement, TableProviderFilterPushDown, TableSource, WriteOp,
48};
49
50use super::dml::InsertOp;
51use super::plan::{ColumnUnnestList, ExplainFormat};
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::{
57    exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
58    plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
59    DataFusionError, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
60};
61use datafusion_expr_common::type_coercion::binary::type_union_resolution;
62
63use indexmap::IndexSet;
64
65/// Default table name for unnamed table
66pub const UNNAMED_TABLE: &str = "?table?";
67
68/// Options for [`LogicalPlanBuilder`]
69#[derive(Default, Debug, Clone)]
70pub struct LogicalPlanBuilderOptions {
71    /// Flag indicating whether the plan builder should add
72    /// functionally dependent expressions as additional aggregation groupings.
73    add_implicit_group_by_exprs: bool,
74}
75
76impl LogicalPlanBuilderOptions {
77    pub fn new() -> Self {
78        Default::default()
79    }
80
81    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
82    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
83        self.add_implicit_group_by_exprs = add;
84        self
85    }
86}
87
88/// Builder for logical plans
89///
90/// # Example building a simple plan
91/// ```
92/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
93/// # use datafusion_common::Result;
94/// # use arrow::datatypes::{Schema, DataType, Field};
95/// #
96/// # fn main() -> Result<()> {
97/// #
98/// # fn employee_schema() -> Schema {
99/// #    Schema::new(vec![
100/// #           Field::new("id", DataType::Int32, false),
101/// #           Field::new("first_name", DataType::Utf8, false),
102/// #           Field::new("last_name", DataType::Utf8, false),
103/// #           Field::new("state", DataType::Utf8, false),
104/// #           Field::new("salary", DataType::Int32, false),
105/// #       ])
106/// #   }
107/// #
108/// // Create a plan similar to
109/// // SELECT last_name
110/// // FROM employees
111/// // WHERE salary < 1000
112/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
113///  // Keep only rows where salary < 1000
114///  .filter(col("salary").lt(lit(1000)))?
115///  // only show "last_name" in the final results
116///  .project(vec![col("last_name")])?
117///  .build()?;
118///
119/// // Convert from plan back to builder
120/// let builder = LogicalPlanBuilder::from(plan);
121///
122/// # Ok(())
123/// # }
124/// ```
125#[derive(Debug, Clone)]
126pub struct LogicalPlanBuilder {
127    plan: Arc<LogicalPlan>,
128    options: LogicalPlanBuilderOptions,
129}
130
131impl LogicalPlanBuilder {
132    /// Create a builder from an existing plan
133    pub fn new(plan: LogicalPlan) -> Self {
134        Self {
135            plan: Arc::new(plan),
136            options: LogicalPlanBuilderOptions::default(),
137        }
138    }
139
140    /// Create a builder from an existing plan
141    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
142        Self {
143            plan,
144            options: LogicalPlanBuilderOptions::default(),
145        }
146    }
147
148    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
149        self.options = options;
150        self
151    }
152
153    /// Return the output schema of the plan build so far
154    pub fn schema(&self) -> &DFSchemaRef {
155        self.plan.schema()
156    }
157
158    /// Return the LogicalPlan of the plan build so far
159    pub fn plan(&self) -> &LogicalPlan {
160        &self.plan
161    }
162
163    /// Create an empty relation.
164    ///
165    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
166    pub fn empty(produce_one_row: bool) -> Self {
167        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
168            produce_one_row,
169            schema: DFSchemaRef::new(DFSchema::empty()),
170        }))
171    }
172
173    /// Convert a regular plan into a recursive query.
174    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
175    pub fn to_recursive_query(
176        self,
177        name: String,
178        recursive_term: LogicalPlan,
179        is_distinct: bool,
180    ) -> Result<Self> {
181        // TODO: we need to do a bunch of validation here. Maybe more.
182        if is_distinct {
183            return not_impl_err!(
184                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
185            );
186        }
187        // Ensure that the static term and the recursive term have the same number of fields
188        let static_fields_len = self.plan.schema().fields().len();
189        let recursive_fields_len = recursive_term.schema().fields().len();
190        if static_fields_len != recursive_fields_len {
191            return plan_err!(
192                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
193                static_fields_len, recursive_fields_len
194            );
195        }
196        // Ensure that the recursive term has the same field types as the static term
197        let coerced_recursive_term =
198            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
199        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
200            name,
201            static_term: self.plan,
202            recursive_term: Arc::new(coerced_recursive_term),
203            is_distinct,
204        })))
205    }
206
207    /// Create a values list based relation, and the schema is inferred from data, consuming
208    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
209    /// documentation for more details.
210    ///
211    /// so it's usually better to override the default names with a table alias list.
212    ///
213    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
214    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
215        if values.is_empty() {
216            return plan_err!("Values list cannot be empty");
217        }
218        let n_cols = values[0].len();
219        if n_cols == 0 {
220            return plan_err!("Values list cannot be zero length");
221        }
222        for (i, row) in values.iter().enumerate() {
223            if row.len() != n_cols {
224                return plan_err!(
225                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
226                    row.len(),
227                    i,
228                    n_cols
229                );
230            }
231        }
232
233        // Infer from data itself
234        Self::infer_data(values)
235    }
236
237    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
238    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
239    /// documentation for more details.
240    ///
241    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
242    /// The column names are not specified by the SQL standard and different database systems do it differently,
243    /// so it's usually better to override the default names with a table alias list.
244    ///
245    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
246    pub fn values_with_schema(
247        values: Vec<Vec<Expr>>,
248        schema: &DFSchemaRef,
249    ) -> Result<Self> {
250        if values.is_empty() {
251            return plan_err!("Values list cannot be empty");
252        }
253        let n_cols = schema.fields().len();
254        if n_cols == 0 {
255            return plan_err!("Values list cannot be zero length");
256        }
257        for (i, row) in values.iter().enumerate() {
258            if row.len() != n_cols {
259                return plan_err!(
260                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
261                    row.len(),
262                    i,
263                    n_cols
264                );
265            }
266        }
267
268        // Check the type of value against the schema
269        Self::infer_values_from_schema(values, schema)
270    }
271
272    fn infer_values_from_schema(
273        values: Vec<Vec<Expr>>,
274        schema: &DFSchema,
275    ) -> Result<Self> {
276        let n_cols = values[0].len();
277        let mut fields = ValuesFields::new();
278        for j in 0..n_cols {
279            let field_type = schema.field(j).data_type();
280            let field_nullable = schema.field(j).is_nullable();
281            for row in values.iter() {
282                let value = &row[j];
283                let data_type = value.get_type(schema)?;
284
285                if !data_type.equals_datatype(field_type) {
286                    if can_cast_types(&data_type, field_type) {
287                    } else {
288                        return exec_err!(
289                            "type mismatch and can't cast to got {} and {}",
290                            data_type,
291                            field_type
292                        );
293                    }
294                }
295            }
296            fields.push(field_type.to_owned(), field_nullable);
297        }
298
299        Self::infer_inner(values, fields, schema)
300    }
301
302    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303        let n_cols = values[0].len();
304        let schema = DFSchema::empty();
305        let mut fields = ValuesFields::new();
306
307        for j in 0..n_cols {
308            let mut common_type: Option<DataType> = None;
309            for (i, row) in values.iter().enumerate() {
310                let value = &row[j];
311                let data_type = value.get_type(&schema)?;
312                if data_type == DataType::Null {
313                    continue;
314                }
315
316                if let Some(prev_type) = common_type {
317                    // get common type of each column values.
318                    let data_types = vec![prev_type.clone(), data_type.clone()];
319                    let Some(new_type) = type_union_resolution(&data_types) else {
320                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
321                    };
322                    common_type = Some(new_type);
323                } else {
324                    common_type = Some(data_type);
325                }
326            }
327            // assuming common_type was not set, and no error, therefore the type should be NULL
328            // since the code loop skips NULL
329            fields.push(common_type.unwrap_or(DataType::Null), true);
330        }
331
332        Self::infer_inner(values, fields, &schema)
333    }
334
335    fn infer_inner(
336        mut values: Vec<Vec<Expr>>,
337        fields: ValuesFields,
338        schema: &DFSchema,
339    ) -> Result<Self> {
340        let fields = fields.into_fields();
341        // wrap cast if data type is not same as common type.
342        for row in &mut values {
343            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
344                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
345                    row[j] = Expr::Literal(
346                        ScalarValue::try_from(field_type)?,
347                        metadata.clone(),
348                    );
349                } else {
350                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
351                }
352            }
353        }
354
355        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
356        let schema = DFSchemaRef::new(dfschema);
357
358        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
359    }
360
361    /// Convert a table provider into a builder with a TableScan
362    ///
363    /// Note that if you pass a string as `table_name`, it is treated
364    /// as a SQL identifier, as described on [`TableReference`] and
365    /// thus is normalized
366    ///
367    /// # Example:
368    /// ```
369    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
370    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
371    /// # };
372    /// # use std::sync::Arc;
373    /// # use arrow::datatypes::{Schema, DataType, Field};
374    /// # use datafusion_common::TableReference;
375    /// #
376    /// # let employee_schema = Arc::new(Schema::new(vec![
377    /// #           Field::new("id", DataType::Int32, false),
378    /// # ])) as _;
379    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
380    /// // Scan table_source with the name "mytable" (after normalization)
381    /// # let table = table_source.clone();
382    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
383    ///
384    /// // Scan table_source with the name "MyTable" by enclosing in quotes
385    /// # let table = table_source.clone();
386    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
387    ///
388    /// // Scan table_source with the name "MyTable" by forming the table reference
389    /// # let table = table_source.clone();
390    /// let table_reference = TableReference::bare("MyTable");
391    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
392    /// ```
393    pub fn scan(
394        table_name: impl Into<TableReference>,
395        table_source: Arc<dyn TableSource>,
396        projection: Option<Vec<usize>>,
397    ) -> Result<Self> {
398        Self::scan_with_filters(table_name, table_source, projection, vec![])
399    }
400
401    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
402    pub fn copy_to(
403        input: LogicalPlan,
404        output_url: String,
405        file_type: Arc<dyn FileType>,
406        options: HashMap<String, String>,
407        partition_by: Vec<String>,
408    ) -> Result<Self> {
409        Ok(Self::new(LogicalPlan::Copy(CopyTo {
410            input: Arc::new(input),
411            output_url,
412            partition_by,
413            file_type,
414            options,
415        })))
416    }
417
418    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
419    ///
420    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
421    ///
422    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
423    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
424    ///
425    /// # Example:
426    /// ```
427    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
428    /// #  logical_plan::builder::LogicalTableSource,
429    /// # };
430    /// # use std::sync::Arc;
431    /// # use arrow::datatypes::{Schema, DataType, Field};
432    /// # use datafusion_expr::dml::InsertOp;
433    /// #
434    /// # fn test() -> datafusion_common::Result<()> {
435    /// # let employee_schema = Arc::new(Schema::new(vec![
436    /// #     Field::new("id", DataType::Int32, false),
437    /// # ])) as _;
438    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
439    /// // VALUES (1), (2)
440    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?
441    ///   .build()?;
442    /// // INSERT INTO MyTable VALUES (1), (2)
443    /// let insert_plan = LogicalPlanBuilder::insert_into(
444    ///   input,
445    ///   "MyTable",
446    ///   table_source,
447    ///   InsertOp::Append,
448    /// )?;
449    /// # Ok(())
450    /// # }
451    /// ```
452    pub fn insert_into(
453        input: LogicalPlan,
454        table_name: impl Into<TableReference>,
455        target: Arc<dyn TableSource>,
456        insert_op: InsertOp,
457    ) -> Result<Self> {
458        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
459            table_name.into(),
460            target,
461            WriteOp::Insert(insert_op),
462            Arc::new(input),
463        ))))
464    }
465
466    /// Convert a table provider into a builder with a TableScan
467    pub fn scan_with_filters(
468        table_name: impl Into<TableReference>,
469        table_source: Arc<dyn TableSource>,
470        projection: Option<Vec<usize>>,
471        filters: Vec<Expr>,
472    ) -> Result<Self> {
473        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
474    }
475
476    /// Convert a table provider into a builder with a TableScan with filter and fetch
477    pub fn scan_with_filters_fetch(
478        table_name: impl Into<TableReference>,
479        table_source: Arc<dyn TableSource>,
480        projection: Option<Vec<usize>>,
481        filters: Vec<Expr>,
482        fetch: Option<usize>,
483    ) -> Result<Self> {
484        Self::scan_with_filters_inner(
485            table_name,
486            table_source,
487            projection,
488            filters,
489            fetch,
490        )
491    }
492
493    fn scan_with_filters_inner(
494        table_name: impl Into<TableReference>,
495        table_source: Arc<dyn TableSource>,
496        projection: Option<Vec<usize>>,
497        filters: Vec<Expr>,
498        fetch: Option<usize>,
499    ) -> Result<Self> {
500        let table_scan =
501            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
502
503        // Inline TableScan
504        if table_scan.filters.is_empty() {
505            if let Some(p) = table_scan.source.get_logical_plan() {
506                let sub_plan = p.into_owned();
507
508                if let Some(proj) = table_scan.projection {
509                    let projection_exprs = proj
510                        .into_iter()
511                        .map(|i| {
512                            Expr::Column(Column::from(
513                                sub_plan.schema().qualified_field(i),
514                            ))
515                        })
516                        .collect::<Vec<_>>();
517                    return Self::new(sub_plan)
518                        .project(projection_exprs)?
519                        .alias(table_scan.table_name);
520                }
521
522                // Ensures that the reference to the inlined table remains the
523                // same, meaning we don't have to change any of the parent nodes
524                // that reference this table.
525                return Self::new(sub_plan).alias(table_scan.table_name);
526            }
527        }
528
529        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
530    }
531
532    /// Wrap a plan in a window
533    pub fn window_plan(
534        input: LogicalPlan,
535        window_exprs: impl IntoIterator<Item = Expr>,
536    ) -> Result<LogicalPlan> {
537        let mut plan = input;
538        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
539        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
540        // we compare the sort key themselves and if one window's sort keys are a prefix of another
541        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
542        // The sort_by() implementation here is a stable sort.
543        // Note that by this rule if there's an empty over, it'll be at the top level
544        groups.sort_by(|(key_a, _), (key_b, _)| {
545            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
546                let key_ordering = compare_sort_expr(first, second, plan.schema());
547                match key_ordering {
548                    Ordering::Less => {
549                        return Ordering::Less;
550                    }
551                    Ordering::Greater => {
552                        return Ordering::Greater;
553                    }
554                    Ordering::Equal => {}
555                }
556            }
557            key_b.len().cmp(&key_a.len())
558        });
559        for (_, exprs) in groups {
560            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
561            // Partition and sorting is done at physical level, see the EnforceDistribution
562            // and EnforceSorting rules.
563            plan = LogicalPlanBuilder::from(plan)
564                .window(window_exprs)?
565                .build()?;
566        }
567        Ok(plan)
568    }
569
570    /// Apply a projection without alias.
571    pub fn project(
572        self,
573        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
574    ) -> Result<Self> {
575        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
576    }
577
578    /// Apply a projection without alias with optional validation
579    /// (true to validate, false to not validate)
580    pub fn project_with_validation(
581        self,
582        expr: Vec<(impl Into<SelectExpr>, bool)>,
583    ) -> Result<Self> {
584        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
585    }
586
587    /// Select the given column indices
588    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
589        let exprs: Vec<_> = indices
590            .into_iter()
591            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
592            .collect();
593        self.project(exprs)
594    }
595
596    /// Apply a filter
597    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
598        let expr = normalize_col(expr.into(), &self.plan)?;
599        Filter::try_new(expr, self.plan)
600            .map(LogicalPlan::Filter)
601            .map(Self::new)
602    }
603
604    /// Apply a filter which is used for a having clause
605    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
606        let expr = normalize_col(expr.into(), &self.plan)?;
607        Filter::try_new(expr, self.plan)
608            .map(LogicalPlan::Filter)
609            .map(Self::from)
610    }
611
612    /// Make a builder for a prepare logical plan from the builder's plan
613    pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
614        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
615            Prepare {
616                name,
617                data_types,
618                input: self.plan,
619            },
620        ))))
621    }
622
623    /// Limit the number of rows returned
624    ///
625    /// `skip` - Number of rows to skip before fetch any row.
626    ///
627    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
628    ///          if specified.
629    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
630        let skip_expr = if skip == 0 {
631            None
632        } else {
633            Some(lit(skip as i64))
634        };
635        let fetch_expr = fetch.map(|f| lit(f as i64));
636        self.limit_by_expr(skip_expr, fetch_expr)
637    }
638
639    /// Limit the number of rows returned
640    ///
641    /// Similar to `limit` but uses expressions for `skip` and `fetch`
642    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
643        Ok(Self::new(LogicalPlan::Limit(Limit {
644            skip: skip.map(Box::new),
645            fetch: fetch.map(Box::new),
646            input: self.plan,
647        })))
648    }
649
650    /// Apply an alias
651    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
652        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
653    }
654
655    /// Add missing sort columns to all downstream projection
656    ///
657    /// Thus, if you have a LogicalPlan that selects A and B and have
658    /// not requested a sort by C, this code will add C recursively to
659    /// all input projections.
660    ///
661    /// Adding a new column is not correct if there is a `Distinct`
662    /// node, which produces only distinct values of its
663    /// inputs. Adding a new column to its input will result in
664    /// potentially different results than with the original column.
665    ///
666    /// For example, if the input is like:
667    ///
668    /// Distinct(A, B)
669    ///
670    /// If the input looks like
671    ///
672    /// a | b | c
673    /// --+---+---
674    /// 1 | 2 | 3
675    /// 1 | 2 | 4
676    ///
677    /// Distinct (A, B) --> (1,2)
678    ///
679    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
680    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
681    ///
682    /// See <https://github.com/apache/datafusion/issues/5065> for more details
683    fn add_missing_columns(
684        curr_plan: LogicalPlan,
685        missing_cols: &IndexSet<Column>,
686        is_distinct: bool,
687    ) -> Result<LogicalPlan> {
688        match curr_plan {
689            LogicalPlan::Projection(Projection {
690                input,
691                mut expr,
692                schema: _,
693            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
694                let mut missing_exprs = missing_cols
695                    .iter()
696                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
697                    .collect::<Result<Vec<_>>>()?;
698
699                // Do not let duplicate columns to be added, some of the
700                // missing_cols may be already present but without the new
701                // projected alias.
702                missing_exprs.retain(|e| !expr.contains(e));
703                if is_distinct {
704                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
705                }
706                expr.extend(missing_exprs);
707                project(Arc::unwrap_or_clone(input), expr)
708            }
709            _ => {
710                let is_distinct =
711                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
712                let new_inputs = curr_plan
713                    .inputs()
714                    .into_iter()
715                    .map(|input_plan| {
716                        Self::add_missing_columns(
717                            (*input_plan).clone(),
718                            missing_cols,
719                            is_distinct,
720                        )
721                    })
722                    .collect::<Result<Vec<_>>>()?;
723                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
724            }
725        }
726    }
727
728    fn ambiguous_distinct_check(
729        missing_exprs: &[Expr],
730        missing_cols: &IndexSet<Column>,
731        projection_exprs: &[Expr],
732    ) -> Result<()> {
733        if missing_exprs.is_empty() {
734            return Ok(());
735        }
736
737        // if the missing columns are all only aliases for things in
738        // the existing select list, it is ok
739        //
740        // This handles the special case for
741        // SELECT col as <alias> ORDER BY <alias>
742        //
743        // As described in https://github.com/apache/datafusion/issues/5293
744        let all_aliases = missing_exprs.iter().all(|e| {
745            projection_exprs.iter().any(|proj_expr| {
746                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
747                    e == expr.as_ref()
748                } else {
749                    false
750                }
751            })
752        });
753        if all_aliases {
754            return Ok(());
755        }
756
757        let missing_col_names = missing_cols
758            .iter()
759            .map(|col| col.flat_name())
760            .collect::<String>();
761
762        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
763    }
764
765    /// Apply a sort by provided expressions with default direction
766    pub fn sort_by(
767        self,
768        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
769    ) -> Result<Self> {
770        self.sort(
771            expr.into_iter()
772                .map(|e| e.into().sort(true, false))
773                .collect::<Vec<SortExpr>>(),
774        )
775    }
776
777    pub fn sort(
778        self,
779        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
780    ) -> Result<Self> {
781        self.sort_with_limit(sorts, None)
782    }
783
784    /// Apply a sort
785    pub fn sort_with_limit(
786        self,
787        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
788        fetch: Option<usize>,
789    ) -> Result<Self> {
790        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
791
792        let schema = self.plan.schema();
793
794        // Collect sort columns that are missing in the input plan's schema
795        let mut missing_cols: IndexSet<Column> = IndexSet::new();
796        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
797            let columns = sort.expr.column_refs();
798
799            missing_cols.extend(
800                columns
801                    .into_iter()
802                    .filter(|c| !schema.has_column(c))
803                    .cloned(),
804            );
805
806            Ok(())
807        })?;
808
809        if missing_cols.is_empty() {
810            return Ok(Self::new(LogicalPlan::Sort(Sort {
811                expr: normalize_sorts(sorts, &self.plan)?,
812                input: self.plan,
813                fetch,
814            })));
815        }
816
817        // remove pushed down sort columns
818        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
819
820        let is_distinct = false;
821        let plan = Self::add_missing_columns(
822            Arc::unwrap_or_clone(self.plan),
823            &missing_cols,
824            is_distinct,
825        )?;
826
827        let sort_plan = LogicalPlan::Sort(Sort {
828            expr: normalize_sorts(sorts, &plan)?,
829            input: Arc::new(plan),
830            fetch,
831        });
832
833        Projection::try_new(new_expr, Arc::new(sort_plan))
834            .map(LogicalPlan::Projection)
835            .map(Self::new)
836    }
837
838    /// Apply a union, preserving duplicate rows
839    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
840        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
841    }
842
843    /// Apply a union by name, preserving duplicate rows
844    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
845        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
846    }
847
848    /// Apply a union by name, removing duplicate rows
849    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
850        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
851        let right_plan: LogicalPlan = plan;
852
853        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
854            union_by_name(left_plan, right_plan)?,
855        )))))
856    }
857
858    /// Apply a union, removing duplicate rows
859    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
860        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
861        let right_plan: LogicalPlan = plan;
862
863        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
864            union(left_plan, right_plan)?,
865        )))))
866    }
867
868    /// Apply deduplication: Only distinct (different) values are returned)
869    pub fn distinct(self) -> Result<Self> {
870        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
871    }
872
873    /// Project first values of the specified expression list according to the provided
874    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
875    pub fn distinct_on(
876        self,
877        on_expr: Vec<Expr>,
878        select_expr: Vec<Expr>,
879        sort_expr: Option<Vec<SortExpr>>,
880    ) -> Result<Self> {
881        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
882            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
883        ))))
884    }
885
886    /// Apply a join to `right` using explicitly specified columns and an
887    /// optional filter expression.
888    ///
889    /// See [`join_on`](Self::join_on) for a more concise way to specify the
890    /// join condition. Since DataFusion will automatically identify and
891    /// optimize equality predicates there is no performance difference between
892    /// this function and `join_on`
893    ///
894    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
895    /// example below), which are then combined with the optional `filter`
896    /// expression.
897    ///
898    /// Note that in case of outer join, the `filter` is applied to only matched rows.
899    pub fn join(
900        self,
901        right: LogicalPlan,
902        join_type: JoinType,
903        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
904        filter: Option<Expr>,
905    ) -> Result<Self> {
906        self.join_detailed(right, join_type, join_keys, filter, false)
907    }
908
909    /// Apply a join using the specified expressions.
910    ///
911    /// Note that DataFusion automatically optimizes joins, including
912    /// identifying and optimizing equality predicates.
913    ///
914    /// # Example
915    ///
916    /// ```
917    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
918    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
919    /// # use std::sync::Arc;
920    /// # use arrow::datatypes::{Schema, DataType, Field};
921    /// # use datafusion_common::Result;
922    /// # fn main() -> Result<()> {
923    /// let example_schema = Arc::new(Schema::new(vec![
924    ///     Field::new("a", DataType::Int32, false),
925    ///     Field::new("b", DataType::Int32, false),
926    ///     Field::new("c", DataType::Int32, false),
927    /// ]));
928    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
929    /// let left_table = table_source.clone();
930    /// let right_table = table_source.clone();
931    ///
932    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
933    ///
934    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
935    /// let exprs = vec![
936    ///     col("left.a").eq(col("right.a")),
937    ///     col("left.b").not_eq(col("right.b"))
938    ///  ];
939    ///
940    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
941    /// // finding all pairs of rows from `left` and `right` where
942    /// // where `a = a2` and `b != b2`.
943    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
944    ///     .join_on(right_plan, JoinType::Inner, exprs)?
945    ///     .build()?;
946    /// # Ok(())
947    /// # }
948    /// ```
949    pub fn join_on(
950        self,
951        right: LogicalPlan,
952        join_type: JoinType,
953        on_exprs: impl IntoIterator<Item = Expr>,
954    ) -> Result<Self> {
955        let filter = on_exprs.into_iter().reduce(Expr::and);
956
957        self.join_detailed(
958            right,
959            join_type,
960            (Vec::<Column>::new(), Vec::<Column>::new()),
961            filter,
962            false,
963        )
964    }
965
966    pub(crate) fn normalize(
967        plan: &LogicalPlan,
968        column: impl Into<Column>,
969    ) -> Result<Column> {
970        let column = column.into();
971        if column.relation.is_some() {
972            // column is already normalized
973            return Ok(column);
974        }
975
976        let schema = plan.schema();
977        let fallback_schemas = plan.fallback_normalize_schemas();
978        let using_columns = plan.using_columns()?;
979        column.normalize_with_schemas_and_ambiguity_check(
980            &[&[schema], &fallback_schemas],
981            &using_columns,
982        )
983    }
984
985    /// Apply a join with on constraint and specified null equality.
986    ///
987    /// The behavior is the same as [`join`](Self::join) except that it allows
988    /// specifying the null equality behavior.
989    ///
990    /// If `null_equals_null=true`, rows where both join keys are `null` will be
991    /// emitted. Otherwise rows where either or both join keys are `null` will be
992    /// omitted.
993    pub fn join_detailed(
994        self,
995        right: LogicalPlan,
996        join_type: JoinType,
997        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
998        filter: Option<Expr>,
999        null_equals_null: bool,
1000    ) -> Result<Self> {
1001        if join_keys.0.len() != join_keys.1.len() {
1002            return plan_err!("left_keys and right_keys were not the same length");
1003        }
1004
1005        let filter = if let Some(expr) = filter {
1006            let filter = normalize_col_with_schemas_and_ambiguity_check(
1007                expr,
1008                &[&[self.schema(), right.schema()]],
1009                &[],
1010            )?;
1011            Some(filter)
1012        } else {
1013            None
1014        };
1015
1016        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1017            join_keys
1018                .0
1019                .into_iter()
1020                .zip(join_keys.1)
1021                .map(|(l, r)| {
1022                    let l = l.into();
1023                    let r = r.into();
1024
1025                    match (&l.relation, &r.relation) {
1026                        (Some(lr), Some(rr)) => {
1027                            let l_is_left =
1028                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1029                            let l_is_right =
1030                                right.schema().field_with_qualified_name(lr, &l.name);
1031                            let r_is_left =
1032                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1033                            let r_is_right =
1034                                right.schema().field_with_qualified_name(rr, &r.name);
1035
1036                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1037                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1038                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1039                                _ => (
1040                                    Self::normalize(&self.plan, l),
1041                                    Self::normalize(&right, r),
1042                                ),
1043                            }
1044                        }
1045                        (Some(lr), None) => {
1046                            let l_is_left =
1047                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1048                            let l_is_right =
1049                                right.schema().field_with_qualified_name(lr, &l.name);
1050
1051                            match (l_is_left, l_is_right) {
1052                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1053                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1054                                _ => (
1055                                    Self::normalize(&self.plan, l),
1056                                    Self::normalize(&right, r),
1057                                ),
1058                            }
1059                        }
1060                        (None, Some(rr)) => {
1061                            let r_is_left =
1062                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1063                            let r_is_right =
1064                                right.schema().field_with_qualified_name(rr, &r.name);
1065
1066                            match (r_is_left, r_is_right) {
1067                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1068                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1069                                _ => (
1070                                    Self::normalize(&self.plan, l),
1071                                    Self::normalize(&right, r),
1072                                ),
1073                            }
1074                        }
1075                        (None, None) => {
1076                            let mut swap = false;
1077                            let left_key = Self::normalize(&self.plan, l.clone())
1078                                .or_else(|_| {
1079                                    swap = true;
1080                                    Self::normalize(&right, l)
1081                                });
1082                            if swap {
1083                                (Self::normalize(&self.plan, r), left_key)
1084                            } else {
1085                                (left_key, Self::normalize(&right, r))
1086                            }
1087                        }
1088                    }
1089                })
1090                .unzip();
1091
1092        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1093        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1094
1095        let on: Vec<_> = left_keys
1096            .into_iter()
1097            .zip(right_keys)
1098            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1099            .collect();
1100        let join_schema =
1101            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1102
1103        // Inner type without join condition is cross join
1104        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1105            return plan_err!("join condition should not be empty");
1106        }
1107
1108        Ok(Self::new(LogicalPlan::Join(Join {
1109            left: self.plan,
1110            right: Arc::new(right),
1111            on,
1112            filter,
1113            join_type,
1114            join_constraint: JoinConstraint::On,
1115            schema: DFSchemaRef::new(join_schema),
1116            null_equals_null,
1117        })))
1118    }
1119
1120    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1121    pub fn join_using(
1122        self,
1123        right: LogicalPlan,
1124        join_type: JoinType,
1125        using_keys: Vec<impl Into<Column> + Clone>,
1126    ) -> Result<Self> {
1127        let left_keys: Vec<Column> = using_keys
1128            .clone()
1129            .into_iter()
1130            .map(|c| Self::normalize(&self.plan, c))
1131            .collect::<Result<_>>()?;
1132        let right_keys: Vec<Column> = using_keys
1133            .into_iter()
1134            .map(|c| Self::normalize(&right, c))
1135            .collect::<Result<_>>()?;
1136
1137        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1138        let mut join_on: Vec<(Expr, Expr)> = vec![];
1139        let mut filters: Option<Expr> = None;
1140        for (l, r) in &on {
1141            if self.plan.schema().has_column(l)
1142                && right.schema().has_column(r)
1143                && can_hash(
1144                    datafusion_common::ExprSchema::field_from_column(
1145                        self.plan.schema(),
1146                        l,
1147                    )?
1148                    .data_type(),
1149                )
1150            {
1151                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1152            } else if self.plan.schema().has_column(l)
1153                && right.schema().has_column(r)
1154                && can_hash(
1155                    datafusion_common::ExprSchema::field_from_column(
1156                        self.plan.schema(),
1157                        r,
1158                    )?
1159                    .data_type(),
1160                )
1161            {
1162                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1163            } else {
1164                let expr = binary_expr(
1165                    Expr::Column(l.clone()),
1166                    Operator::Eq,
1167                    Expr::Column(r.clone()),
1168                );
1169                match filters {
1170                    None => filters = Some(expr),
1171                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1172                }
1173            }
1174        }
1175
1176        if join_on.is_empty() {
1177            let join = Self::from(self.plan).cross_join(right)?;
1178            join.filter(filters.ok_or_else(|| {
1179                DataFusionError::Internal("filters should not be None here".to_string())
1180            })?)
1181        } else {
1182            let join = Join::try_new(
1183                self.plan,
1184                Arc::new(right),
1185                join_on,
1186                filters,
1187                join_type,
1188                JoinConstraint::Using,
1189                false,
1190            )?;
1191
1192            Ok(Self::new(LogicalPlan::Join(join)))
1193        }
1194    }
1195
1196    /// Apply a cross join
1197    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1198        let join = Join::try_new(
1199            self.plan,
1200            Arc::new(right),
1201            vec![],
1202            None,
1203            JoinType::Inner,
1204            JoinConstraint::On,
1205            false,
1206        )?;
1207
1208        Ok(Self::new(LogicalPlan::Join(join)))
1209    }
1210
1211    /// Repartition
1212    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1213        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1214            input: self.plan,
1215            partitioning_scheme,
1216        })))
1217    }
1218
1219    /// Apply a window functions to extend the schema
1220    pub fn window(
1221        self,
1222        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1223    ) -> Result<Self> {
1224        let window_expr = normalize_cols(window_expr, &self.plan)?;
1225        validate_unique_names("Windows", &window_expr)?;
1226        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1227            window_expr,
1228            self.plan,
1229        )?)))
1230    }
1231
1232    /// Apply an aggregate: grouping on the `group_expr` expressions
1233    /// and calculating `aggr_expr` aggregates for each distinct
1234    /// value of the `group_expr`;
1235    pub fn aggregate(
1236        self,
1237        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1238        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1239    ) -> Result<Self> {
1240        let group_expr = normalize_cols(group_expr, &self.plan)?;
1241        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1242
1243        let group_expr = if self.options.add_implicit_group_by_exprs {
1244            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1245        } else {
1246            group_expr
1247        };
1248
1249        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1250            .map(LogicalPlan::Aggregate)
1251            .map(Self::new)
1252    }
1253
1254    /// Create an expression to represent the explanation of the plan
1255    ///
1256    /// if `analyze` is true, runs the actual plan and produces
1257    /// information about metrics during run.
1258    ///
1259    /// if `verbose` is true, prints out additional details.
1260    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1261        let schema = LogicalPlan::explain_schema();
1262        let schema = schema.to_dfschema_ref()?;
1263
1264        if analyze {
1265            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1266                verbose,
1267                input: self.plan,
1268                schema,
1269            })))
1270        } else {
1271            let stringified_plans =
1272                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1273
1274            Ok(Self::new(LogicalPlan::Explain(Explain {
1275                verbose,
1276                plan: self.plan,
1277                explain_format: ExplainFormat::Indent,
1278                stringified_plans,
1279                schema,
1280                logical_optimization_succeeded: false,
1281            })))
1282        }
1283    }
1284
1285    /// Process intersect set operator
1286    pub fn intersect(
1287        left_plan: LogicalPlan,
1288        right_plan: LogicalPlan,
1289        is_all: bool,
1290    ) -> Result<LogicalPlan> {
1291        LogicalPlanBuilder::intersect_or_except(
1292            left_plan,
1293            right_plan,
1294            JoinType::LeftSemi,
1295            is_all,
1296        )
1297    }
1298
1299    /// Process except set operator
1300    pub fn except(
1301        left_plan: LogicalPlan,
1302        right_plan: LogicalPlan,
1303        is_all: bool,
1304    ) -> Result<LogicalPlan> {
1305        LogicalPlanBuilder::intersect_or_except(
1306            left_plan,
1307            right_plan,
1308            JoinType::LeftAnti,
1309            is_all,
1310        )
1311    }
1312
1313    /// Process intersect or except
1314    fn intersect_or_except(
1315        left_plan: LogicalPlan,
1316        right_plan: LogicalPlan,
1317        join_type: JoinType,
1318        is_all: bool,
1319    ) -> Result<LogicalPlan> {
1320        let left_len = left_plan.schema().fields().len();
1321        let right_len = right_plan.schema().fields().len();
1322
1323        if left_len != right_len {
1324            return plan_err!(
1325                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1326            );
1327        }
1328
1329        let join_keys = left_plan
1330            .schema()
1331            .fields()
1332            .iter()
1333            .zip(right_plan.schema().fields().iter())
1334            .map(|(left_field, right_field)| {
1335                (
1336                    (Column::from_name(left_field.name())),
1337                    (Column::from_name(right_field.name())),
1338                )
1339            })
1340            .unzip();
1341        if is_all {
1342            LogicalPlanBuilder::from(left_plan)
1343                .join_detailed(right_plan, join_type, join_keys, None, true)?
1344                .build()
1345        } else {
1346            LogicalPlanBuilder::from(left_plan)
1347                .distinct()?
1348                .join_detailed(right_plan, join_type, join_keys, None, true)?
1349                .build()
1350        }
1351    }
1352
1353    /// Build the plan
1354    pub fn build(self) -> Result<LogicalPlan> {
1355        Ok(Arc::unwrap_or_clone(self.plan))
1356    }
1357
1358    /// Apply a join with both explicit equijoin and non equijoin predicates.
1359    ///
1360    /// Note this is a low level API that requires identifying specific
1361    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1362    /// automatically identifies predicates appropriately.
1363    ///
1364    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1365    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1366    /// to columns from the existing input. `r`, the second element of the tuple,
1367    /// must only refer to columns from the right input.
1368    ///
1369    /// `filter` contains any other filter expression to apply during the
1370    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1371    /// than the filter expressions, so they are preferred.
1372    pub fn join_with_expr_keys(
1373        self,
1374        right: LogicalPlan,
1375        join_type: JoinType,
1376        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1377        filter: Option<Expr>,
1378    ) -> Result<Self> {
1379        if equi_exprs.0.len() != equi_exprs.1.len() {
1380            return plan_err!("left_keys and right_keys were not the same length");
1381        }
1382
1383        let join_key_pairs = equi_exprs
1384            .0
1385            .into_iter()
1386            .zip(equi_exprs.1)
1387            .map(|(l, r)| {
1388                let left_key = l.into();
1389                let right_key = r.into();
1390                let mut left_using_columns  = HashSet::new();
1391                expr_to_columns(&left_key, &mut left_using_columns)?;
1392                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1393                    left_key,
1394                    &[&[self.plan.schema()]],
1395                    &[],
1396                )?;
1397
1398                let mut right_using_columns = HashSet::new();
1399                expr_to_columns(&right_key, &mut right_using_columns)?;
1400                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1401                    right_key,
1402                    &[&[right.schema()]],
1403                    &[],
1404                )?;
1405
1406                // find valid equijoin
1407                find_valid_equijoin_key_pair(
1408                        &normalized_left_key,
1409                        &normalized_right_key,
1410                        self.plan.schema(),
1411                        right.schema(),
1412                    )?.ok_or_else(||
1413                        plan_datafusion_err!(
1414                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1415                        ))
1416            })
1417            .collect::<Result<Vec<_>>>()?;
1418
1419        let join = Join::try_new(
1420            self.plan,
1421            Arc::new(right),
1422            join_key_pairs,
1423            filter,
1424            join_type,
1425            JoinConstraint::On,
1426            false,
1427        )?;
1428
1429        Ok(Self::new(LogicalPlan::Join(join)))
1430    }
1431
1432    /// Unnest the given column.
1433    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1434        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1435    }
1436
1437    /// Unnest the given column given [`UnnestOptions`]
1438    pub fn unnest_column_with_options(
1439        self,
1440        column: impl Into<Column>,
1441        options: UnnestOptions,
1442    ) -> Result<Self> {
1443        unnest_with_options(
1444            Arc::unwrap_or_clone(self.plan),
1445            vec![column.into()],
1446            options,
1447        )
1448        .map(Self::new)
1449    }
1450
1451    /// Unnest the given columns with the given [`UnnestOptions`]
1452    pub fn unnest_columns_with_options(
1453        self,
1454        columns: Vec<Column>,
1455        options: UnnestOptions,
1456    ) -> Result<Self> {
1457        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1458            .map(Self::new)
1459    }
1460}
1461
1462impl From<LogicalPlan> for LogicalPlanBuilder {
1463    fn from(plan: LogicalPlan) -> Self {
1464        LogicalPlanBuilder::new(plan)
1465    }
1466}
1467
1468impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1469    fn from(plan: Arc<LogicalPlan>) -> Self {
1470        LogicalPlanBuilder::new_from_arc(plan)
1471    }
1472}
1473
1474/// Container used when building fields for a `VALUES` node.
1475#[derive(Default)]
1476struct ValuesFields {
1477    inner: Vec<Field>,
1478}
1479
1480impl ValuesFields {
1481    pub fn new() -> Self {
1482        Self::default()
1483    }
1484
1485    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1486        // Naming follows the convention described here:
1487        // https://www.postgresql.org/docs/current/queries-values.html
1488        let name = format!("column{}", self.inner.len() + 1);
1489        self.inner.push(Field::new(name, data_type, nullable));
1490    }
1491
1492    pub fn into_fields(self) -> Fields {
1493        self.inner.into()
1494    }
1495}
1496
1497// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1498//
1499// Some field names might already come to this function with the count (number of times it appeared)
1500// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
1501// if these three fields passed to this function: "col:1", "col" and "col", the function
1502// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1503// that's why we need the `seen` set, so the fields are always unique.
1504//
1505pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1506    let mut name_map = HashMap::new();
1507    let mut seen: HashSet<String> = HashSet::new();
1508
1509    fields
1510        .into_iter()
1511        .map(|field| {
1512            let base_name = field.name();
1513            let count = name_map.entry(base_name.clone()).or_insert(0);
1514            let mut new_name = base_name.clone();
1515
1516            // Loop until we find a name that hasn't been used
1517            while seen.contains(&new_name) {
1518                *count += 1;
1519                new_name = format!("{base_name}:{count}");
1520            }
1521
1522            seen.insert(new_name.clone());
1523
1524            let mut modified_field =
1525                Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1526            modified_field.set_metadata(field.metadata().clone());
1527            modified_field
1528        })
1529        .collect()
1530}
1531
1532fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1533    let mut table_references = schema
1534        .iter()
1535        .filter_map(|(qualifier, _)| qualifier)
1536        .collect::<Vec<_>>();
1537    table_references.dedup();
1538    let table_reference = if table_references.len() == 1 {
1539        table_references.pop().cloned()
1540    } else {
1541        None
1542    };
1543
1544    (
1545        table_reference,
1546        Arc::new(Field::new("mark", DataType::Boolean, false)),
1547    )
1548}
1549
1550/// Creates a schema for a join operation.
1551/// The fields from the left side are first
1552pub fn build_join_schema(
1553    left: &DFSchema,
1554    right: &DFSchema,
1555    join_type: &JoinType,
1556) -> Result<DFSchema> {
1557    fn nullify_fields<'a>(
1558        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1559    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1560        fields
1561            .map(|(q, f)| {
1562                // TODO: find a good way to do that
1563                let field = f.as_ref().clone().with_nullable(true);
1564                (q.cloned(), Arc::new(field))
1565            })
1566            .collect()
1567    }
1568
1569    let right_fields = right.iter();
1570    let left_fields = left.iter();
1571
1572    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1573        JoinType::Inner => {
1574            // left then right
1575            let left_fields = left_fields
1576                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1577                .collect::<Vec<_>>();
1578            let right_fields = right_fields
1579                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1580                .collect::<Vec<_>>();
1581            left_fields.into_iter().chain(right_fields).collect()
1582        }
1583        JoinType::Left => {
1584            // left then right, right set to nullable in case of not matched scenario
1585            let left_fields = left_fields
1586                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1587                .collect::<Vec<_>>();
1588            left_fields
1589                .into_iter()
1590                .chain(nullify_fields(right_fields))
1591                .collect()
1592        }
1593        JoinType::Right => {
1594            // left then right, left set to nullable in case of not matched scenario
1595            let right_fields = right_fields
1596                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1597                .collect::<Vec<_>>();
1598            nullify_fields(left_fields)
1599                .into_iter()
1600                .chain(right_fields)
1601                .collect()
1602        }
1603        JoinType::Full => {
1604            // left then right, all set to nullable in case of not matched scenario
1605            nullify_fields(left_fields)
1606                .into_iter()
1607                .chain(nullify_fields(right_fields))
1608                .collect()
1609        }
1610        JoinType::LeftSemi | JoinType::LeftAnti => {
1611            // Only use the left side for the schema
1612            left_fields
1613                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1614                .collect()
1615        }
1616        JoinType::LeftMark => left_fields
1617            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1618            .chain(once(mark_field(right)))
1619            .collect(),
1620        JoinType::RightSemi | JoinType::RightAnti => {
1621            // Only use the right side for the schema
1622            right_fields
1623                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1624                .collect()
1625        }
1626    };
1627    let func_dependencies = left.functional_dependencies().join(
1628        right.functional_dependencies(),
1629        join_type,
1630        left.fields().len(),
1631    );
1632
1633    let (schema1, schema2) = match join_type {
1634        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1635        _ => (right, left),
1636    };
1637
1638    let metadata = schema1
1639        .metadata()
1640        .clone()
1641        .into_iter()
1642        .chain(schema2.metadata().clone())
1643        .collect();
1644
1645    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1646    dfschema.with_functional_dependencies(func_dependencies)
1647}
1648
1649/// Add additional "synthetic" group by expressions based on functional
1650/// dependencies.
1651///
1652/// For example, if we are grouping on `[c1]`, and we know from
1653/// functional dependencies that column `c1` determines `c2`, this function
1654/// adds `c2` to the group by list.
1655///
1656/// This allows MySQL style selects like
1657/// `SELECT col FROM t WHERE pk = 5` if col is unique
1658pub fn add_group_by_exprs_from_dependencies(
1659    mut group_expr: Vec<Expr>,
1660    schema: &DFSchemaRef,
1661) -> Result<Vec<Expr>> {
1662    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1663    // c1 + 1` produces an output field named `"c1 + 1"`
1664    let mut group_by_field_names = group_expr
1665        .iter()
1666        .map(|e| e.schema_name().to_string())
1667        .collect::<Vec<_>>();
1668
1669    if let Some(target_indices) =
1670        get_target_functional_dependencies(schema, &group_by_field_names)
1671    {
1672        for idx in target_indices {
1673            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1674            let expr_name = expr.schema_name().to_string();
1675            if !group_by_field_names.contains(&expr_name) {
1676                group_by_field_names.push(expr_name);
1677                group_expr.push(expr);
1678            }
1679        }
1680    }
1681    Ok(group_expr)
1682}
1683
1684/// Errors if one or more expressions have equal names.
1685pub fn validate_unique_names<'a>(
1686    node_name: &str,
1687    expressions: impl IntoIterator<Item = &'a Expr>,
1688) -> Result<()> {
1689    let mut unique_names = HashMap::new();
1690
1691    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1692        let name = expr.schema_name().to_string();
1693        match unique_names.get(&name) {
1694            None => {
1695                unique_names.insert(name, (position, expr));
1696                Ok(())
1697            },
1698            Some((existing_position, existing_expr)) => {
1699                plan_err!("{node_name} require unique expression names \
1700                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1701                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1702                            )
1703            }
1704        }
1705    })
1706}
1707
1708/// Union two [`LogicalPlan`]s.
1709///
1710/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1711/// subtree expressions will not be properly typed until the optimizer pass.
1712///
1713/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1714/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1715/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1716///
1717/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1718/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1719pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1720    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1721        Arc::new(left_plan),
1722        Arc::new(right_plan),
1723    ])?))
1724}
1725
1726/// Like [`union`], but combine rows from different tables by name, rather than
1727/// by position.
1728pub fn union_by_name(
1729    left_plan: LogicalPlan,
1730    right_plan: LogicalPlan,
1731) -> Result<LogicalPlan> {
1732    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1733        Arc::new(left_plan),
1734        Arc::new(right_plan),
1735    ])?))
1736}
1737
1738/// Create Projection
1739/// # Errors
1740/// This function errors under any of the following conditions:
1741/// * Two or more expressions have the same name
1742/// * An invalid expression is used (e.g. a `sort` expression)
1743pub fn project(
1744    plan: LogicalPlan,
1745    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1746) -> Result<LogicalPlan> {
1747    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1748}
1749
1750/// Create Projection. Similar to project except that the expressions
1751/// passed in have a flag to indicate if that expression requires
1752/// validation (normalize & columnize) (true) or not (false)
1753/// # Errors
1754/// This function errors under any of the following conditions:
1755/// * Two or more expressions have the same name
1756/// * An invalid expression is used (e.g. a `sort` expression)
1757fn project_with_validation(
1758    plan: LogicalPlan,
1759    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1760) -> Result<LogicalPlan> {
1761    let mut projected_expr = vec![];
1762    for (e, validate) in expr {
1763        let e = e.into();
1764        match e {
1765            SelectExpr::Wildcard(opt) => {
1766                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1767
1768                // If there is a REPLACE statement, replace that column with the given
1769                // replace expression. Column name remains the same.
1770                let expanded = if let Some(replace) = opt.replace {
1771                    replace_columns(expanded, &replace)?
1772                } else {
1773                    expanded
1774                };
1775
1776                for e in expanded {
1777                    if validate {
1778                        projected_expr
1779                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1780                    } else {
1781                        projected_expr.push(e)
1782                    }
1783                }
1784            }
1785            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1786                let expanded =
1787                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1788
1789                // If there is a REPLACE statement, replace that column with the given
1790                // replace expression. Column name remains the same.
1791                let expanded = if let Some(replace) = opt.replace {
1792                    replace_columns(expanded, &replace)?
1793                } else {
1794                    expanded
1795                };
1796
1797                for e in expanded {
1798                    if validate {
1799                        projected_expr
1800                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1801                    } else {
1802                        projected_expr.push(e)
1803                    }
1804                }
1805            }
1806            SelectExpr::Expression(e) => {
1807                if validate {
1808                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1809                } else {
1810                    projected_expr.push(e)
1811                }
1812            }
1813        }
1814    }
1815    validate_unique_names("Projections", projected_expr.iter())?;
1816
1817    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1818}
1819
1820/// If there is a REPLACE statement in the projected expression in the form of
1821/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1822/// that column with the given replace expression. Column name remains the same.
1823/// Multiple REPLACEs are also possible with comma separations.
1824fn replace_columns(
1825    mut exprs: Vec<Expr>,
1826    replace: &PlannedReplaceSelectItem,
1827) -> Result<Vec<Expr>> {
1828    for expr in exprs.iter_mut() {
1829        if let Expr::Column(Column { name, .. }) = expr {
1830            if let Some((_, new_expr)) = replace
1831                .items()
1832                .iter()
1833                .zip(replace.expressions().iter())
1834                .find(|(item, _)| item.column_name.value == *name)
1835            {
1836                *expr = new_expr.clone().alias(name.clone())
1837            }
1838        }
1839    }
1840    Ok(exprs)
1841}
1842
1843/// Create a SubqueryAlias to wrap a LogicalPlan.
1844pub fn subquery_alias(
1845    plan: LogicalPlan,
1846    alias: impl Into<TableReference>,
1847) -> Result<LogicalPlan> {
1848    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1849}
1850
1851/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1852/// This is mostly used for testing and documentation.
1853pub fn table_scan(
1854    name: Option<impl Into<TableReference>>,
1855    table_schema: &Schema,
1856    projection: Option<Vec<usize>>,
1857) -> Result<LogicalPlanBuilder> {
1858    table_scan_with_filters(name, table_schema, projection, vec![])
1859}
1860
1861/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1862/// and inlined filters.
1863/// This is mostly used for testing and documentation.
1864pub fn table_scan_with_filters(
1865    name: Option<impl Into<TableReference>>,
1866    table_schema: &Schema,
1867    projection: Option<Vec<usize>>,
1868    filters: Vec<Expr>,
1869) -> Result<LogicalPlanBuilder> {
1870    let table_source = table_source(table_schema);
1871    let name = name
1872        .map(|n| n.into())
1873        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1874    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1875}
1876
1877/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1878/// filters, and inlined fetch.
1879/// This is mostly used for testing and documentation.
1880pub fn table_scan_with_filter_and_fetch(
1881    name: Option<impl Into<TableReference>>,
1882    table_schema: &Schema,
1883    projection: Option<Vec<usize>>,
1884    filters: Vec<Expr>,
1885    fetch: Option<usize>,
1886) -> Result<LogicalPlanBuilder> {
1887    let table_source = table_source(table_schema);
1888    let name = name
1889        .map(|n| n.into())
1890        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1891    LogicalPlanBuilder::scan_with_filters_fetch(
1892        name,
1893        table_source,
1894        projection,
1895        filters,
1896        fetch,
1897    )
1898}
1899
1900pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1901    let table_schema = Arc::new(table_schema.clone());
1902    Arc::new(LogicalTableSource {
1903        table_schema,
1904        constraints: Default::default(),
1905    })
1906}
1907
1908pub fn table_source_with_constraints(
1909    table_schema: &Schema,
1910    constraints: Constraints,
1911) -> Arc<dyn TableSource> {
1912    let table_schema = Arc::new(table_schema.clone());
1913    Arc::new(LogicalTableSource {
1914        table_schema,
1915        constraints,
1916    })
1917}
1918
1919/// Wrap projection for a plan, if the join keys contains normal expression.
1920pub fn wrap_projection_for_join_if_necessary(
1921    join_keys: &[Expr],
1922    input: LogicalPlan,
1923) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1924    let input_schema = input.schema();
1925    let alias_join_keys: Vec<Expr> = join_keys
1926        .iter()
1927        .map(|key| {
1928            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
1929            // If we do not add alias, it will throw same field name error in the schema when adding projection.
1930            // For example:
1931            //    input scan : [a, b, c],
1932            //    join keys: [cast(a as int)]
1933            //
1934            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
1935            //  https://github.com/apache/datafusion/issues/4478
1936            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1937                let alias = format!("{key}");
1938                key.clone().alias(alias)
1939            } else {
1940                key.clone()
1941            }
1942        })
1943        .collect::<Vec<_>>();
1944
1945    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
1946    let plan = if need_project {
1947        // Include all columns from the input and extend them with the join keys
1948        let mut projection = input_schema
1949            .columns()
1950            .into_iter()
1951            .map(Expr::Column)
1952            .collect::<Vec<_>>();
1953        let join_key_items = alias_join_keys
1954            .iter()
1955            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
1956            .cloned()
1957            .collect::<HashSet<Expr>>();
1958        projection.extend(join_key_items);
1959
1960        LogicalPlanBuilder::from(input)
1961            .project(projection.into_iter().map(SelectExpr::from))?
1962            .build()?
1963    } else {
1964        input
1965    };
1966
1967    let join_on = alias_join_keys
1968        .into_iter()
1969        .map(|key| {
1970            if let Some(col) = key.try_as_col() {
1971                Ok(col.clone())
1972            } else {
1973                let name = key.schema_name().to_string();
1974                Ok(Column::from_name(name))
1975            }
1976        })
1977        .collect::<Result<Vec<_>>>()?;
1978
1979    Ok((plan, join_on, need_project))
1980}
1981
1982/// Basic TableSource implementation intended for use in tests and documentation. It is expected
1983/// that users will provide their own TableSource implementations or use DataFusion's
1984/// DefaultTableSource.
1985pub struct LogicalTableSource {
1986    table_schema: SchemaRef,
1987    constraints: Constraints,
1988}
1989
1990impl LogicalTableSource {
1991    /// Create a new LogicalTableSource
1992    pub fn new(table_schema: SchemaRef) -> Self {
1993        Self {
1994            table_schema,
1995            constraints: Constraints::default(),
1996        }
1997    }
1998
1999    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2000        self.constraints = constraints;
2001        self
2002    }
2003}
2004
2005impl TableSource for LogicalTableSource {
2006    fn as_any(&self) -> &dyn Any {
2007        self
2008    }
2009
2010    fn schema(&self) -> SchemaRef {
2011        Arc::clone(&self.table_schema)
2012    }
2013
2014    fn constraints(&self) -> Option<&Constraints> {
2015        Some(&self.constraints)
2016    }
2017
2018    fn supports_filters_pushdown(
2019        &self,
2020        filters: &[&Expr],
2021    ) -> Result<Vec<TableProviderFilterPushDown>> {
2022        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2023    }
2024}
2025
2026/// Create a [`LogicalPlan::Unnest`] plan
2027pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2028    unnest_with_options(input, columns, UnnestOptions::default())
2029}
2030
2031// Get the data type of a multi-dimensional type after unnesting it
2032// with a given depth
2033fn get_unnested_list_datatype_recursive(
2034    data_type: &DataType,
2035    depth: usize,
2036) -> Result<DataType> {
2037    match data_type {
2038        DataType::List(field)
2039        | DataType::FixedSizeList(field, _)
2040        | DataType::LargeList(field) => {
2041            if depth == 1 {
2042                return Ok(field.data_type().clone());
2043            }
2044            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
2045        }
2046        _ => {}
2047    };
2048
2049    internal_err!("trying to unnest on invalid data type {:?}", data_type)
2050}
2051
2052pub fn get_struct_unnested_columns(
2053    col_name: &String,
2054    inner_fields: &Fields,
2055) -> Vec<Column> {
2056    inner_fields
2057        .iter()
2058        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2059        .collect()
2060}
2061
2062// Based on data type, either struct or a variant of list
2063// return a set of columns as the result of unnesting
2064// the input columns.
2065// For example, given a column with name "a",
2066// - List(Element) returns ["a"] with data type Element
2067// - Struct(field1, field2) returns ["a.field1","a.field2"]
2068// For list data type, an argument depth is used to specify
2069// the recursion level
2070pub fn get_unnested_columns(
2071    col_name: &String,
2072    data_type: &DataType,
2073    depth: usize,
2074) -> Result<Vec<(Column, Arc<Field>)>> {
2075    let mut qualified_columns = Vec::with_capacity(1);
2076
2077    match data_type {
2078        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
2079            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
2080            let new_field = Arc::new(Field::new(
2081                col_name, data_type,
2082                // Unnesting may produce NULLs even if the list is not null.
2083                // For example: unnest([1], []) -> 1, null
2084                true,
2085            ));
2086            let column = Column::from_name(col_name);
2087            // let column = Column::from((None, &new_field));
2088            qualified_columns.push((column, new_field));
2089        }
2090        DataType::Struct(fields) => {
2091            qualified_columns.extend(fields.iter().map(|f| {
2092                let new_name = format!("{}.{}", col_name, f.name());
2093                let column = Column::from_name(&new_name);
2094                let new_field = f.as_ref().clone().with_name(new_name);
2095                // let column = Column::from((None, &f));
2096                (column, Arc::new(new_field))
2097            }))
2098        }
2099        _ => {
2100            return internal_err!(
2101                "trying to unnest on invalid data type {:?}",
2102                data_type
2103            );
2104        }
2105    };
2106    Ok(qualified_columns)
2107}
2108
2109/// Create a [`LogicalPlan::Unnest`] plan with options
2110/// This function receive a list of columns to be unnested
2111/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2112/// The new schema will contains post-unnest fields replacing the original field
2113///
2114/// For example:
2115/// Input schema as
2116/// ```text
2117/// +---------------------+-------------------+
2118/// | col1                | col2              |
2119/// +---------------------+-------------------+
2120/// | Struct(INT64,INT32) | List(List(Int64)) |
2121/// +---------------------+-------------------+
2122/// ```
2123///
2124///
2125///
2126/// Then unnesting columns with:
2127/// - (col1,Struct)
2128/// - (col2,List(\[depth=1,depth=2\]))
2129///
2130/// will generate a new schema as
2131/// ```text
2132/// +---------+---------+---------------------+---------------------+
2133/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2134/// +---------+---------+---------------------+---------------------+
2135/// | Int64   | Int32   | List(Int64)         |  Int64              |
2136/// +---------+---------+---------------------+---------------------+
2137/// ```
2138pub fn unnest_with_options(
2139    input: LogicalPlan,
2140    columns_to_unnest: Vec<Column>,
2141    options: UnnestOptions,
2142) -> Result<LogicalPlan> {
2143    let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
2144    let mut struct_columns = vec![];
2145    let indices_to_unnest = columns_to_unnest
2146        .iter()
2147        .map(|c| Ok((input.schema().index_of_column(c)?, c)))
2148        .collect::<Result<HashMap<usize, &Column>>>()?;
2149
2150    let input_schema = input.schema();
2151
2152    let mut dependency_indices = vec![];
2153    // Transform input schema into new schema
2154    // Given this comprehensive example
2155    //
2156    // input schema:
2157    // 1.col1_unnest_placeholder: list[list[int]],
2158    // 2.col1: list[list[int]]
2159    // 3.col2: list[int]
2160    // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
2161    // output schema:
2162    // 1.unnest_col1_depth_2: int
2163    // 2.unnest_col1_depth_1: list[int]
2164    // 3.col1: list[list[int]]
2165    // 4.unnest_col2_depth_1: int
2166    // Meaning the placeholder column will be replaced by its unnested variation(s), note
2167    // the plural.
2168    let fields = input_schema
2169        .iter()
2170        .enumerate()
2171        .map(|(index, (original_qualifier, original_field))| {
2172            match indices_to_unnest.get(&index) {
2173                Some(column_to_unnest) => {
2174                    let recursions_on_column = options
2175                        .recursions
2176                        .iter()
2177                        .filter(|p| -> bool { &p.input_column == *column_to_unnest })
2178                        .collect::<Vec<_>>();
2179                    let mut transformed_columns = recursions_on_column
2180                        .iter()
2181                        .map(|r| {
2182                            list_columns.push((
2183                                index,
2184                                ColumnUnnestList {
2185                                    output_column: r.output_column.clone(),
2186                                    depth: r.depth,
2187                                },
2188                            ));
2189                            Ok(get_unnested_columns(
2190                                &r.output_column.name,
2191                                original_field.data_type(),
2192                                r.depth,
2193                            )?
2194                            .into_iter()
2195                            .next()
2196                            .unwrap()) // because unnesting a list column always result into one result
2197                        })
2198                        .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2199                    if transformed_columns.is_empty() {
2200                        transformed_columns = get_unnested_columns(
2201                            &column_to_unnest.name,
2202                            original_field.data_type(),
2203                            1,
2204                        )?;
2205                        match original_field.data_type() {
2206                            DataType::Struct(_) => {
2207                                struct_columns.push(index);
2208                            }
2209                            DataType::List(_)
2210                            | DataType::FixedSizeList(_, _)
2211                            | DataType::LargeList(_) => {
2212                                list_columns.push((
2213                                    index,
2214                                    ColumnUnnestList {
2215                                        output_column: Column::from_name(
2216                                            &column_to_unnest.name,
2217                                        ),
2218                                        depth: 1,
2219                                    },
2220                                ));
2221                            }
2222                            _ => {}
2223                        };
2224                    }
2225
2226                    // new columns dependent on the same original index
2227                    dependency_indices
2228                        .extend(std::iter::repeat_n(index, transformed_columns.len()));
2229                    Ok(transformed_columns
2230                        .iter()
2231                        .map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2232                        .collect())
2233                }
2234                None => {
2235                    dependency_indices.push(index);
2236                    Ok(vec![(
2237                        original_qualifier.cloned(),
2238                        Arc::clone(original_field),
2239                    )])
2240                }
2241            }
2242        })
2243        .collect::<Result<Vec<_>>>()?
2244        .into_iter()
2245        .flatten()
2246        .collect::<Vec<_>>();
2247
2248    let metadata = input_schema.metadata().clone();
2249    let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2250    // We can use the existing functional dependencies:
2251    let deps = input_schema.functional_dependencies().clone();
2252    let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2253
2254    Ok(LogicalPlan::Unnest(Unnest {
2255        input: Arc::new(input),
2256        exec_columns: columns_to_unnest,
2257        list_type_columns: list_columns,
2258        struct_type_columns: struct_columns,
2259        dependency_indices,
2260        schema,
2261        options,
2262    }))
2263}
2264
2265#[cfg(test)]
2266mod tests {
2267    use super::*;
2268    use crate::logical_plan::StringifiedPlan;
2269    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2270
2271    use crate::test::function_stub::sum;
2272    use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2273    use insta::assert_snapshot;
2274
2275    #[test]
2276    fn plan_builder_simple() -> Result<()> {
2277        let plan =
2278            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2279                .filter(col("state").eq(lit("CO")))?
2280                .project(vec![col("id")])?
2281                .build()?;
2282
2283        assert_snapshot!(plan, @r#"
2284        Projection: employee_csv.id
2285          Filter: employee_csv.state = Utf8("CO")
2286            TableScan: employee_csv projection=[id, state]
2287        "#);
2288
2289        Ok(())
2290    }
2291
2292    #[test]
2293    fn plan_builder_schema() {
2294        let schema = employee_schema();
2295        let projection = None;
2296        let plan =
2297            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2298                .unwrap();
2299        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2300
2301        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2302        // (and thus normalized to "employee"csv") as well
2303        let projection = None;
2304        let plan =
2305            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2306                .unwrap();
2307        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2308    }
2309
2310    #[test]
2311    fn plan_builder_empty_name() {
2312        let schema = employee_schema();
2313        let projection = None;
2314        let err =
2315            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2316        assert_snapshot!(
2317            err.strip_backtrace(),
2318            @"Error during planning: table_name cannot be empty"
2319        );
2320    }
2321
2322    #[test]
2323    fn plan_builder_sort() -> Result<()> {
2324        let plan =
2325            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2326                .sort(vec![
2327                    expr::Sort::new(col("state"), true, true),
2328                    expr::Sort::new(col("salary"), false, false),
2329                ])?
2330                .build()?;
2331
2332        assert_snapshot!(plan, @r"
2333        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2334          TableScan: employee_csv projection=[state, salary]
2335        ");
2336
2337        Ok(())
2338    }
2339
2340    #[test]
2341    fn plan_builder_union() -> Result<()> {
2342        let plan =
2343            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2344
2345        let plan = plan
2346            .clone()
2347            .union(plan.clone().build()?)?
2348            .union(plan.clone().build()?)?
2349            .union(plan.build()?)?
2350            .build()?;
2351
2352        assert_snapshot!(plan, @r"
2353        Union
2354          Union
2355            Union
2356              TableScan: employee_csv projection=[state, salary]
2357              TableScan: employee_csv projection=[state, salary]
2358            TableScan: employee_csv projection=[state, salary]
2359          TableScan: employee_csv projection=[state, salary]
2360        ");
2361
2362        Ok(())
2363    }
2364
2365    #[test]
2366    fn plan_builder_union_distinct() -> Result<()> {
2367        let plan =
2368            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2369
2370        let plan = plan
2371            .clone()
2372            .union_distinct(plan.clone().build()?)?
2373            .union_distinct(plan.clone().build()?)?
2374            .union_distinct(plan.build()?)?
2375            .build()?;
2376
2377        assert_snapshot!(plan, @r"
2378        Distinct:
2379          Union
2380            Distinct:
2381              Union
2382                Distinct:
2383                  Union
2384                    TableScan: employee_csv projection=[state, salary]
2385                    TableScan: employee_csv projection=[state, salary]
2386                TableScan: employee_csv projection=[state, salary]
2387            TableScan: employee_csv projection=[state, salary]
2388        ");
2389
2390        Ok(())
2391    }
2392
2393    #[test]
2394    fn plan_builder_simple_distinct() -> Result<()> {
2395        let plan =
2396            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2397                .filter(col("state").eq(lit("CO")))?
2398                .project(vec![col("id")])?
2399                .distinct()?
2400                .build()?;
2401
2402        assert_snapshot!(plan, @r#"
2403        Distinct:
2404          Projection: employee_csv.id
2405            Filter: employee_csv.state = Utf8("CO")
2406              TableScan: employee_csv projection=[id, state]
2407        "#);
2408
2409        Ok(())
2410    }
2411
2412    #[test]
2413    fn exists_subquery() -> Result<()> {
2414        let foo = test_table_scan_with_name("foo")?;
2415        let bar = test_table_scan_with_name("bar")?;
2416
2417        let subquery = LogicalPlanBuilder::from(foo)
2418            .project(vec![col("a")])?
2419            .filter(col("a").eq(col("bar.a")))?
2420            .build()?;
2421
2422        let outer_query = LogicalPlanBuilder::from(bar)
2423            .project(vec![col("a")])?
2424            .filter(exists(Arc::new(subquery)))?
2425            .build()?;
2426
2427        assert_snapshot!(outer_query, @r"
2428        Filter: EXISTS (<subquery>)
2429          Subquery:
2430            Filter: foo.a = bar.a
2431              Projection: foo.a
2432                TableScan: foo
2433          Projection: bar.a
2434            TableScan: bar
2435        ");
2436
2437        Ok(())
2438    }
2439
2440    #[test]
2441    fn filter_in_subquery() -> Result<()> {
2442        let foo = test_table_scan_with_name("foo")?;
2443        let bar = test_table_scan_with_name("bar")?;
2444
2445        let subquery = LogicalPlanBuilder::from(foo)
2446            .project(vec![col("a")])?
2447            .filter(col("a").eq(col("bar.a")))?
2448            .build()?;
2449
2450        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2451        let outer_query = LogicalPlanBuilder::from(bar)
2452            .project(vec![col("a")])?
2453            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2454            .build()?;
2455
2456        assert_snapshot!(outer_query, @r"
2457        Filter: bar.a IN (<subquery>)
2458          Subquery:
2459            Filter: foo.a = bar.a
2460              Projection: foo.a
2461                TableScan: foo
2462          Projection: bar.a
2463            TableScan: bar
2464        ");
2465
2466        Ok(())
2467    }
2468
2469    #[test]
2470    fn select_scalar_subquery() -> Result<()> {
2471        let foo = test_table_scan_with_name("foo")?;
2472        let bar = test_table_scan_with_name("bar")?;
2473
2474        let subquery = LogicalPlanBuilder::from(foo)
2475            .project(vec![col("b")])?
2476            .filter(col("a").eq(col("bar.a")))?
2477            .build()?;
2478
2479        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2480        let outer_query = LogicalPlanBuilder::from(bar)
2481            .project(vec![scalar_subquery(Arc::new(subquery))])?
2482            .build()?;
2483
2484        assert_snapshot!(outer_query, @r"
2485        Projection: (<subquery>)
2486          Subquery:
2487            Filter: foo.a = bar.a
2488              Projection: foo.b
2489                TableScan: foo
2490          TableScan: bar
2491        ");
2492
2493        Ok(())
2494    }
2495
2496    #[test]
2497    fn projection_non_unique_names() -> Result<()> {
2498        let plan = table_scan(
2499            Some("employee_csv"),
2500            &employee_schema(),
2501            // project id and first_name by column index
2502            Some(vec![0, 1]),
2503        )?
2504        // two columns with the same name => error
2505        .project(vec![col("id"), col("first_name").alias("id")]);
2506
2507        match plan {
2508            Err(DataFusionError::SchemaError(
2509                SchemaError::AmbiguousReference {
2510                    field:
2511                        Column {
2512                            relation: Some(TableReference::Bare { table }),
2513                            name,
2514                            spans: _,
2515                        },
2516                },
2517                _,
2518            )) => {
2519                assert_eq!(*"employee_csv", *table);
2520                assert_eq!("id", &name);
2521                Ok(())
2522            }
2523            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2524        }
2525    }
2526
2527    fn employee_schema() -> Schema {
2528        Schema::new(vec![
2529            Field::new("id", DataType::Int32, false),
2530            Field::new("first_name", DataType::Utf8, false),
2531            Field::new("last_name", DataType::Utf8, false),
2532            Field::new("state", DataType::Utf8, false),
2533            Field::new("salary", DataType::Int32, false),
2534        ])
2535    }
2536
2537    #[test]
2538    fn stringified_plan() {
2539        let stringified_plan =
2540            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2541        assert!(stringified_plan.should_display(true));
2542        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2543
2544        let stringified_plan =
2545            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2546        assert!(stringified_plan.should_display(true));
2547        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2548
2549        let stringified_plan =
2550            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2551        assert!(stringified_plan.should_display(true));
2552        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2553
2554        let stringified_plan =
2555            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2556        assert!(stringified_plan.should_display(true));
2557        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2558
2559        let stringified_plan = StringifiedPlan::new(
2560            PlanType::OptimizedLogicalPlan {
2561                optimizer_name: "random opt pass".into(),
2562            },
2563            "...the plan...",
2564        );
2565        assert!(stringified_plan.should_display(true));
2566        assert!(!stringified_plan.should_display(false));
2567    }
2568
2569    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2570        let schema = Schema::new(vec![
2571            Field::new("a", DataType::UInt32, false),
2572            Field::new("b", DataType::UInt32, false),
2573            Field::new("c", DataType::UInt32, false),
2574        ]);
2575        table_scan(Some(name), &schema, None)?.build()
2576    }
2577
2578    #[test]
2579    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2580        let plan1 =
2581            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2582        let plan2 =
2583            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2584
2585        let err_msg1 =
2586            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2587                .unwrap_err();
2588
2589        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2590
2591        Ok(())
2592    }
2593
2594    #[test]
2595    fn plan_builder_unnest() -> Result<()> {
2596        // Cannot unnest on a scalar column
2597        let err = nested_table_scan("test_table")?
2598            .unnest_column("scalar")
2599            .unwrap_err();
2600
2601        let DataFusionError::Internal(desc) = err else {
2602            return plan_err!("Plan should have returned an DataFusionError::Internal");
2603        };
2604
2605        let desc = desc
2606            .split(DataFusionError::BACK_TRACE_SEP)
2607            .collect::<Vec<&str>>()
2608            .first()
2609            .unwrap_or(&"")
2610            .to_string();
2611
2612        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2613
2614        // Unnesting the strings list.
2615        let plan = nested_table_scan("test_table")?
2616            .unnest_column("strings")?
2617            .build()?;
2618
2619        assert_snapshot!(plan, @r"
2620        Unnest: lists[test_table.strings|depth=1] structs[]
2621          TableScan: test_table
2622        ");
2623
2624        // Check unnested field is a scalar
2625        let field = plan.schema().field_with_name(None, "strings").unwrap();
2626        assert_eq!(&DataType::Utf8, field.data_type());
2627
2628        // Unnesting the singular struct column result into 2 new columns for each subfield
2629        let plan = nested_table_scan("test_table")?
2630            .unnest_column("struct_singular")?
2631            .build()?;
2632
2633        assert_snapshot!(plan, @r"
2634        Unnest: lists[] structs[test_table.struct_singular]
2635          TableScan: test_table
2636        ");
2637
2638        for field_name in &["a", "b"] {
2639            // Check unnested struct field is a scalar
2640            let field = plan
2641                .schema()
2642                .field_with_name(None, &format!("struct_singular.{field_name}"))
2643                .unwrap();
2644            assert_eq!(&DataType::UInt32, field.data_type());
2645        }
2646
2647        // Unnesting multiple fields in separate plans
2648        let plan = nested_table_scan("test_table")?
2649            .unnest_column("strings")?
2650            .unnest_column("structs")?
2651            .unnest_column("struct_singular")?
2652            .build()?;
2653
2654        assert_snapshot!(plan, @r"
2655        Unnest: lists[] structs[test_table.struct_singular]
2656          Unnest: lists[test_table.structs|depth=1] structs[]
2657            Unnest: lists[test_table.strings|depth=1] structs[]
2658              TableScan: test_table
2659        ");
2660
2661        // Check unnested struct list field should be a struct.
2662        let field = plan.schema().field_with_name(None, "structs").unwrap();
2663        assert!(matches!(field.data_type(), DataType::Struct(_)));
2664
2665        // Unnesting multiple fields at the same time, using infer syntax
2666        let cols = vec!["strings", "structs", "struct_singular"]
2667            .into_iter()
2668            .map(|c| c.into())
2669            .collect();
2670
2671        let plan = nested_table_scan("test_table")?
2672            .unnest_columns_with_options(cols, UnnestOptions::default())?
2673            .build()?;
2674
2675        assert_snapshot!(plan, @r"
2676        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2677          TableScan: test_table
2678        ");
2679
2680        // Unnesting missing column should fail.
2681        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2682        assert!(plan.is_err());
2683
2684        // Simultaneously unnesting a list (with different depth) and a struct column
2685        let plan = nested_table_scan("test_table")?
2686            .unnest_columns_with_options(
2687                vec!["stringss".into(), "struct_singular".into()],
2688                UnnestOptions::default()
2689                    .with_recursions(RecursionUnnestOption {
2690                        input_column: "stringss".into(),
2691                        output_column: "stringss_depth_1".into(),
2692                        depth: 1,
2693                    })
2694                    .with_recursions(RecursionUnnestOption {
2695                        input_column: "stringss".into(),
2696                        output_column: "stringss_depth_2".into(),
2697                        depth: 2,
2698                    }),
2699            )?
2700            .build()?;
2701
2702        assert_snapshot!(plan, @r"
2703        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2704          TableScan: test_table
2705        ");
2706
2707        // Check output columns has correct type
2708        let field = plan
2709            .schema()
2710            .field_with_name(None, "stringss_depth_1")
2711            .unwrap();
2712        assert_eq!(
2713            &DataType::new_list(DataType::Utf8, false),
2714            field.data_type()
2715        );
2716        let field = plan
2717            .schema()
2718            .field_with_name(None, "stringss_depth_2")
2719            .unwrap();
2720        assert_eq!(&DataType::Utf8, field.data_type());
2721        // unnesting struct is still correct
2722        for field_name in &["a", "b"] {
2723            let field = plan
2724                .schema()
2725                .field_with_name(None, &format!("struct_singular.{field_name}"))
2726                .unwrap();
2727            assert_eq!(&DataType::UInt32, field.data_type());
2728        }
2729
2730        Ok(())
2731    }
2732
2733    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2734        // Create a schema with a scalar field, a list of strings, a list of structs
2735        // and a singular struct
2736        let struct_field_in_list = Field::new_struct(
2737            "item",
2738            vec![
2739                Field::new("a", DataType::UInt32, false),
2740                Field::new("b", DataType::UInt32, false),
2741            ],
2742            false,
2743        );
2744        let string_field = Field::new_list_field(DataType::Utf8, false);
2745        let strings_field = Field::new_list("item", string_field.clone(), false);
2746        let schema = Schema::new(vec![
2747            Field::new("scalar", DataType::UInt32, false),
2748            Field::new_list("strings", string_field, false),
2749            Field::new_list("structs", struct_field_in_list, false),
2750            Field::new(
2751                "struct_singular",
2752                DataType::Struct(Fields::from(vec![
2753                    Field::new("a", DataType::UInt32, false),
2754                    Field::new("b", DataType::UInt32, false),
2755                ])),
2756                false,
2757            ),
2758            Field::new_list("stringss", strings_field, false),
2759        ]);
2760
2761        table_scan(Some(table_name), &schema, None)
2762    }
2763
2764    #[test]
2765    fn test_union_after_join() -> Result<()> {
2766        let values = vec![vec![lit(1)]];
2767
2768        let left = LogicalPlanBuilder::values(values.clone())?
2769            .alias("left")?
2770            .build()?;
2771        let right = LogicalPlanBuilder::values(values)?
2772            .alias("right")?
2773            .build()?;
2774
2775        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2776
2777        let plan = LogicalPlanBuilder::from(join.clone())
2778            .union(join)?
2779            .build()?;
2780
2781        assert_snapshot!(plan, @r"
2782        Union
2783          Cross Join: 
2784            SubqueryAlias: left
2785              Values: (Int32(1))
2786            SubqueryAlias: right
2787              Values: (Int32(1))
2788          Cross Join: 
2789            SubqueryAlias: left
2790              Values: (Int32(1))
2791            SubqueryAlias: right
2792              Values: (Int32(1))
2793        ");
2794
2795        Ok(())
2796    }
2797
2798    #[test]
2799    fn test_change_redundant_column() -> Result<()> {
2800        let t1_field_1 = Field::new("a", DataType::Int32, false);
2801        let t2_field_1 = Field::new("a", DataType::Int32, false);
2802        let t2_field_3 = Field::new("a", DataType::Int32, false);
2803        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2804        let t1_field_2 = Field::new("b", DataType::Int32, false);
2805        let t2_field_2 = Field::new("b", DataType::Int32, false);
2806
2807        let field_vec = vec![
2808            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2809        ];
2810        let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2811
2812        assert_eq!(
2813            remove_redundant,
2814            vec![
2815                Field::new("a", DataType::Int32, false),
2816                Field::new("a:1", DataType::Int32, false),
2817                Field::new("b", DataType::Int32, false),
2818                Field::new("b:1", DataType::Int32, false),
2819                Field::new("a:2", DataType::Int32, false),
2820                Field::new("a:1:1", DataType::Int32, false),
2821            ]
2822        );
2823        Ok(())
2824    }
2825
2826    #[test]
2827    fn plan_builder_from_logical_plan() -> Result<()> {
2828        let plan =
2829            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2830                .sort(vec![
2831                    expr::Sort::new(col("state"), true, true),
2832                    expr::Sort::new(col("salary"), false, false),
2833                ])?
2834                .build()?;
2835
2836        let plan_expected = format!("{plan}");
2837        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2838        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2839
2840        Ok(())
2841    }
2842
2843    #[test]
2844    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2845        let constraints =
2846            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2847        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2848
2849        let plan =
2850            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2851                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2852                .build()?;
2853
2854        assert_snapshot!(plan, @r"
2855        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2856          TableScan: employee_csv projection=[id, state, salary]
2857        ");
2858
2859        Ok(())
2860    }
2861
2862    #[test]
2863    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2864        let constraints =
2865            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2866        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2867
2868        let options =
2869            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2870        let plan =
2871            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2872                .with_options(options)
2873                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2874                .build()?;
2875
2876        assert_snapshot!(plan, @r"
2877        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2878          TableScan: employee_csv projection=[id, state, salary]
2879        ");
2880
2881        Ok(())
2882    }
2883
2884    #[test]
2885    fn test_join_metadata() -> Result<()> {
2886        let left_schema = DFSchema::new_with_metadata(
2887            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2888            HashMap::from([("key".to_string(), "left".to_string())]),
2889        )?;
2890        let right_schema = DFSchema::new_with_metadata(
2891            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2892            HashMap::from([("key".to_string(), "right".to_string())]),
2893        )?;
2894
2895        let join_schema =
2896            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2897        assert_eq!(
2898            join_schema.metadata(),
2899            &HashMap::from([("key".to_string(), "left".to_string())])
2900        );
2901        let join_schema =
2902            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2903        assert_eq!(
2904            join_schema.metadata(),
2905            &HashMap::from([("key".to_string(), "right".to_string())])
2906        );
2907
2908        Ok(())
2909    }
2910}