Skip to main content

datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30    coerce_plan_expr_for_schema, normalize_col,
31    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32    rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38    Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44    group_window_expr_by_sort_keys,
45};
46use crate::{
47    DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
48    Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58    Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
59    TableReference, ToDFSchema, UnnestOptions, exec_err,
60    get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
61    plan_err,
62};
63use datafusion_expr_common::type_coercion::binary::type_union_resolution;
64
65use indexmap::IndexSet;
66
67/// Default table name for unnamed table
68pub const UNNAMED_TABLE: &str = "?table?";
69
70/// Options for [`LogicalPlanBuilder`]
71#[derive(Default, Debug, Clone)]
72pub struct LogicalPlanBuilderOptions {
73    /// Flag indicating whether the plan builder should add
74    /// functionally dependent expressions as additional aggregation groupings.
75    add_implicit_group_by_exprs: bool,
76}
77
78impl LogicalPlanBuilderOptions {
79    pub fn new() -> Self {
80        Default::default()
81    }
82
83    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
84    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
85        self.add_implicit_group_by_exprs = add;
86        self
87    }
88}
89
90/// Builder for logical plans
91///
92/// # Example building a simple plan
93/// ```
94/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
95/// # use datafusion_common::Result;
96/// # use arrow::datatypes::{Schema, DataType, Field};
97/// #
98/// # fn main() -> Result<()> {
99/// #
100/// # fn employee_schema() -> Schema {
101/// #    Schema::new(vec![
102/// #           Field::new("id", DataType::Int32, false),
103/// #           Field::new("first_name", DataType::Utf8, false),
104/// #           Field::new("last_name", DataType::Utf8, false),
105/// #           Field::new("state", DataType::Utf8, false),
106/// #           Field::new("salary", DataType::Int32, false),
107/// #       ])
108/// #   }
109/// #
110/// // Create a plan similar to
111/// // SELECT last_name
112/// // FROM employees
113/// // WHERE salary < 1000
114/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
115///  // Keep only rows where salary < 1000
116///  .filter(col("salary").lt(lit(1000)))?
117///  // only show "last_name" in the final results
118///  .project(vec![col("last_name")])?
119///  .build()?;
120///
121/// // Convert from plan back to builder
122/// let builder = LogicalPlanBuilder::from(plan);
123///
124/// # Ok(())
125/// # }
126/// ```
127#[derive(Debug, Clone)]
128pub struct LogicalPlanBuilder {
129    plan: Arc<LogicalPlan>,
130    options: LogicalPlanBuilderOptions,
131}
132
133impl LogicalPlanBuilder {
134    /// Create a builder from an existing plan
135    pub fn new(plan: LogicalPlan) -> Self {
136        Self {
137            plan: Arc::new(plan),
138            options: LogicalPlanBuilderOptions::default(),
139        }
140    }
141
142    /// Create a builder from an existing plan
143    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
144        Self {
145            plan,
146            options: LogicalPlanBuilderOptions::default(),
147        }
148    }
149
150    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
151        self.options = options;
152        self
153    }
154
155    /// Return the output schema of the plan build so far
156    pub fn schema(&self) -> &DFSchemaRef {
157        self.plan.schema()
158    }
159
160    /// Return the LogicalPlan of the plan build so far
161    pub fn plan(&self) -> &LogicalPlan {
162        &self.plan
163    }
164
165    /// Create an empty relation.
166    ///
167    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
168    pub fn empty(produce_one_row: bool) -> Self {
169        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
170            produce_one_row,
171            schema: DFSchemaRef::new(DFSchema::empty()),
172        }))
173    }
174
175    /// Convert a regular plan into a recursive query.
176    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
177    pub fn to_recursive_query(
178        self,
179        name: String,
180        recursive_term: LogicalPlan,
181        is_distinct: bool,
182    ) -> Result<Self> {
183        // Ensure that the static term and the recursive term have the same number of fields
184        let static_fields_len = self.plan.schema().fields().len();
185        let recursive_fields_len = recursive_term.schema().fields().len();
186        if static_fields_len != recursive_fields_len {
187            return plan_err!(
188                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
189                static_fields_len,
190                recursive_fields_len
191            );
192        }
193        // Ensure that the recursive term has the same field types as the static term
194        let coerced_recursive_term =
195            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
196        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
197            name,
198            static_term: self.plan,
199            recursive_term: Arc::new(coerced_recursive_term),
200            is_distinct,
201        })))
202    }
203
204    /// Create a values list based relation, and the schema is inferred from data, consuming
205    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
206    /// documentation for more details.
207    ///
208    /// so it's usually better to override the default names with a table alias list.
209    ///
210    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
211    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
212        if values.is_empty() {
213            return plan_err!("Values list cannot be empty");
214        }
215        let n_cols = values[0].len();
216        if n_cols == 0 {
217            return plan_err!("Values list cannot be zero length");
218        }
219        for (i, row) in values.iter().enumerate() {
220            if row.len() != n_cols {
221                return plan_err!(
222                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
223                    row.len(),
224                    i,
225                    n_cols
226                );
227            }
228        }
229
230        // Infer from data itself
231        Self::infer_data(values)
232    }
233
234    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
235    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
236    /// documentation for more details.
237    ///
238    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
239    /// The column names are not specified by the SQL standard and different database systems do it differently,
240    /// so it's usually better to override the default names with a table alias list.
241    ///
242    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
243    pub fn values_with_schema(
244        values: Vec<Vec<Expr>>,
245        schema: &DFSchemaRef,
246    ) -> Result<Self> {
247        if values.is_empty() {
248            return plan_err!("Values list cannot be empty");
249        }
250        let n_cols = schema.fields().len();
251        if n_cols == 0 {
252            return plan_err!("Values list cannot be zero length");
253        }
254        for (i, row) in values.iter().enumerate() {
255            if row.len() != n_cols {
256                return plan_err!(
257                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
258                    row.len(),
259                    i,
260                    n_cols
261                );
262            }
263        }
264
265        // Check the type of value against the schema
266        Self::infer_values_from_schema(values, schema)
267    }
268
269    fn infer_values_from_schema(
270        values: Vec<Vec<Expr>>,
271        schema: &DFSchema,
272    ) -> Result<Self> {
273        let n_cols = values[0].len();
274        let mut fields = ValuesFields::new();
275        for j in 0..n_cols {
276            let field_type = schema.field(j).data_type();
277            let field_nullable = schema.field(j).is_nullable();
278            for row in values.iter() {
279                let value = &row[j];
280                let data_type = value.get_type(schema)?;
281
282                if !data_type.equals_datatype(field_type)
283                    && !can_cast_types(&data_type, field_type)
284                {
285                    return exec_err!(
286                        "type mismatch and can't cast to got {} and {}",
287                        data_type,
288                        field_type
289                    );
290                }
291            }
292            fields.push(field_type.to_owned(), field_nullable);
293        }
294
295        Self::infer_inner(values, fields, schema)
296    }
297
298    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
299        let n_cols = values[0].len();
300        let schema = DFSchema::empty();
301        let mut fields = ValuesFields::new();
302
303        for j in 0..n_cols {
304            let mut common_type: Option<DataType> = None;
305            let mut common_metadata: Option<FieldMetadata> = None;
306            for (i, row) in values.iter().enumerate() {
307                let value = &row[j];
308                let metadata = value.metadata(&schema)?;
309                if let Some(ref cm) = common_metadata {
310                    if &metadata != cm {
311                        return plan_err!(
312                            "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
313                            cm,
314                            metadata
315                        );
316                    }
317                } else {
318                    common_metadata = Some(metadata.clone());
319                }
320                let data_type = value.get_type(&schema)?;
321                if data_type == DataType::Null {
322                    continue;
323                }
324
325                if let Some(prev_type) = common_type {
326                    // get common type of each column values.
327                    let data_types = vec![prev_type.clone(), data_type.clone()];
328                    let Some(new_type) = type_union_resolution(&data_types) else {
329                        return plan_err!(
330                            "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
331                        );
332                    };
333                    common_type = Some(new_type);
334                } else {
335                    common_type = Some(data_type);
336                }
337            }
338            // assuming common_type was not set, and no error, therefore the type should be NULL
339            // since the code loop skips NULL
340            fields.push_with_metadata(
341                common_type.unwrap_or(DataType::Null),
342                true,
343                common_metadata,
344            );
345        }
346
347        Self::infer_inner(values, fields, &schema)
348    }
349
350    fn infer_inner(
351        mut values: Vec<Vec<Expr>>,
352        fields: ValuesFields,
353        schema: &DFSchema,
354    ) -> Result<Self> {
355        let fields = fields.into_fields();
356        // wrap cast if data type is not same as common type.
357        for row in &mut values {
358            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
359                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
360                    row[j] = Expr::Literal(
361                        ScalarValue::try_from(field_type)?,
362                        metadata.clone(),
363                    );
364                } else {
365                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
366                }
367            }
368        }
369
370        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
371        let schema = DFSchemaRef::new(dfschema);
372
373        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
374    }
375
376    /// Convert a table provider into a builder with a TableScan
377    ///
378    /// Note that if you pass a string as `table_name`, it is treated
379    /// as a SQL identifier, as described on [`TableReference`] and
380    /// thus is normalized
381    ///
382    /// # Example:
383    /// ```
384    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
385    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
386    /// # };
387    /// # use std::sync::Arc;
388    /// # use arrow::datatypes::{Schema, DataType, Field};
389    /// # use datafusion_common::TableReference;
390    /// #
391    /// # let employee_schema = Arc::new(Schema::new(vec![
392    /// #           Field::new("id", DataType::Int32, false),
393    /// # ])) as _;
394    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
395    /// // Scan table_source with the name "mytable" (after normalization)
396    /// # let table = table_source.clone();
397    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
398    ///
399    /// // Scan table_source with the name "MyTable" by enclosing in quotes
400    /// # let table = table_source.clone();
401    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
402    ///
403    /// // Scan table_source with the name "MyTable" by forming the table reference
404    /// # let table = table_source.clone();
405    /// let table_reference = TableReference::bare("MyTable");
406    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
407    /// ```
408    pub fn scan(
409        table_name: impl Into<TableReference>,
410        table_source: Arc<dyn TableSource>,
411        projection: Option<Vec<usize>>,
412    ) -> Result<Self> {
413        Self::scan_with_filters(table_name, table_source, projection, vec![])
414    }
415
416    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
417    pub fn copy_to(
418        input: LogicalPlan,
419        output_url: String,
420        file_type: Arc<dyn FileType>,
421        options: HashMap<String, String>,
422        partition_by: Vec<String>,
423    ) -> Result<Self> {
424        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
425            Arc::new(input),
426            output_url,
427            partition_by,
428            file_type,
429            options,
430        ))))
431    }
432
433    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
434    ///
435    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
436    ///
437    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
438    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
439    ///
440    /// # Example:
441    /// ```
442    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
443    /// #  logical_plan::builder::LogicalTableSource,
444    /// # };
445    /// # use std::sync::Arc;
446    /// # use arrow::datatypes::{Schema, DataType, Field};
447    /// # use datafusion_expr::dml::InsertOp;
448    /// #
449    /// # fn test() -> datafusion_common::Result<()> {
450    /// # let employee_schema = Arc::new(Schema::new(vec![
451    /// #     Field::new("id", DataType::Int32, false),
452    /// # ])) as _;
453    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
454    /// // VALUES (1), (2)
455    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?.build()?;
456    /// // INSERT INTO MyTable VALUES (1), (2)
457    /// let insert_plan = LogicalPlanBuilder::insert_into(
458    ///     input,
459    ///     "MyTable",
460    ///     table_source,
461    ///     InsertOp::Append,
462    /// )?;
463    /// # Ok(())
464    /// # }
465    /// ```
466    pub fn insert_into(
467        input: LogicalPlan,
468        table_name: impl Into<TableReference>,
469        target: Arc<dyn TableSource>,
470        insert_op: InsertOp,
471    ) -> Result<Self> {
472        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
473            table_name.into(),
474            target,
475            WriteOp::Insert(insert_op),
476            Arc::new(input),
477        ))))
478    }
479
480    /// Convert a table provider into a builder with a TableScan
481    pub fn scan_with_filters(
482        table_name: impl Into<TableReference>,
483        table_source: Arc<dyn TableSource>,
484        projection: Option<Vec<usize>>,
485        filters: Vec<Expr>,
486    ) -> Result<Self> {
487        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
488    }
489
490    /// Convert a table provider into a builder with a TableScan with filter and fetch
491    pub fn scan_with_filters_fetch(
492        table_name: impl Into<TableReference>,
493        table_source: Arc<dyn TableSource>,
494        projection: Option<Vec<usize>>,
495        filters: Vec<Expr>,
496        fetch: Option<usize>,
497    ) -> Result<Self> {
498        Self::scan_with_filters_inner(
499            table_name,
500            table_source,
501            projection,
502            filters,
503            fetch,
504        )
505    }
506
507    fn scan_with_filters_inner(
508        table_name: impl Into<TableReference>,
509        table_source: Arc<dyn TableSource>,
510        projection: Option<Vec<usize>>,
511        filters: Vec<Expr>,
512        fetch: Option<usize>,
513    ) -> Result<Self> {
514        let table_scan =
515            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
516
517        // Inline TableScan
518        if table_scan.filters.is_empty()
519            && let Some(p) = table_scan.source.get_logical_plan()
520        {
521            let sub_plan = p.into_owned();
522
523            if let Some(proj) = table_scan.projection {
524                let projection_exprs = proj
525                    .into_iter()
526                    .map(|i| {
527                        Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
528                    })
529                    .collect::<Vec<_>>();
530                return Self::new(sub_plan)
531                    .project(projection_exprs)?
532                    .alias(table_scan.table_name);
533            }
534
535            // Ensures that the reference to the inlined table remains the
536            // same, meaning we don't have to change any of the parent nodes
537            // that reference this table.
538            return Self::new(sub_plan).alias(table_scan.table_name);
539        }
540
541        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542    }
543
544    /// Wrap a plan in a window
545    pub fn window_plan(
546        input: LogicalPlan,
547        window_exprs: impl IntoIterator<Item = Expr>,
548    ) -> Result<LogicalPlan> {
549        let mut plan = input;
550        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
552        // we compare the sort key themselves and if one window's sort keys are a prefix of another
553        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
554        // The sort_by() implementation here is a stable sort.
555        // Note that by this rule if there's an empty over, it'll be at the top level
556        groups.sort_by(|(key_a, _), (key_b, _)| {
557            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558                let key_ordering = compare_sort_expr(first, second, plan.schema());
559                match key_ordering {
560                    Ordering::Less => {
561                        return Ordering::Less;
562                    }
563                    Ordering::Greater => {
564                        return Ordering::Greater;
565                    }
566                    Ordering::Equal => {}
567                }
568            }
569            key_b.len().cmp(&key_a.len())
570        });
571        for (_, exprs) in groups {
572            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573            // Partition and sorting is done at physical level, see the EnforceDistribution
574            // and EnforceSorting rules.
575            plan = LogicalPlanBuilder::from(plan)
576                .window(window_exprs)?
577                .build()?;
578        }
579        Ok(plan)
580    }
581
582    /// Apply a projection without alias.
583    pub fn project(
584        self,
585        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586    ) -> Result<Self> {
587        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588    }
589
590    /// Apply a projection without alias with optional validation
591    /// (true to validate, false to not validate)
592    pub fn project_with_validation(
593        self,
594        expr: Vec<(impl Into<SelectExpr>, bool)>,
595    ) -> Result<Self> {
596        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597    }
598
599    /// Select the given column indices
600    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601        let exprs: Vec<_> = indices
602            .into_iter()
603            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604            .collect();
605        self.project(exprs)
606    }
607
608    /// Apply a filter
609    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610        let expr = normalize_col(expr.into(), &self.plan)?;
611        Filter::try_new(expr, self.plan)
612            .map(LogicalPlan::Filter)
613            .map(Self::new)
614    }
615
616    /// Apply a filter which is used for a having clause
617    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618        let expr = normalize_col(expr.into(), &self.plan)?;
619        Filter::try_new(expr, self.plan)
620            .map(LogicalPlan::Filter)
621            .map(Self::from)
622    }
623
624    /// Make a builder for a prepare logical plan from the builder's plan
625    pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627            Prepare {
628                name,
629                fields,
630                input: self.plan,
631            },
632        ))))
633    }
634
635    /// Limit the number of rows returned
636    ///
637    /// `skip` - Number of rows to skip before fetch any row.
638    ///
639    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
640    ///          if specified.
641    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642        let skip_expr = if skip == 0 {
643            None
644        } else {
645            Some(lit(skip as i64))
646        };
647        let fetch_expr = fetch.map(|f| lit(f as i64));
648        self.limit_by_expr(skip_expr, fetch_expr)
649    }
650
651    /// Limit the number of rows returned
652    ///
653    /// Similar to `limit` but uses expressions for `skip` and `fetch`
654    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655        Ok(Self::new(LogicalPlan::Limit(Limit {
656            skip: skip.map(Box::new),
657            fetch: fetch.map(Box::new),
658            input: self.plan,
659        })))
660    }
661
662    /// Apply an alias
663    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665    }
666
667    /// Add missing sort columns to all downstream projection
668    ///
669    /// Thus, if you have a LogicalPlan that selects A and B and have
670    /// not requested a sort by C, this code will add C recursively to
671    /// all input projections.
672    ///
673    /// Adding a new column is not correct if there is a `Distinct`
674    /// node, which produces only distinct values of its
675    /// inputs. Adding a new column to its input will result in
676    /// potentially different results than with the original column.
677    ///
678    /// For example, if the input is like:
679    ///
680    /// Distinct(A, B)
681    ///
682    /// If the input looks like
683    ///
684    /// a | b | c
685    /// --+---+---
686    /// 1 | 2 | 3
687    /// 1 | 2 | 4
688    ///
689    /// Distinct (A, B) --> (1,2)
690    ///
691    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
692    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
693    ///
694    /// See <https://github.com/apache/datafusion/issues/5065> for more details
695    fn add_missing_columns(
696        curr_plan: LogicalPlan,
697        missing_cols: &IndexSet<Column>,
698        is_distinct: bool,
699    ) -> Result<LogicalPlan> {
700        match curr_plan {
701            LogicalPlan::Projection(Projection {
702                input,
703                mut expr,
704                schema: _,
705            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706                let mut missing_exprs = missing_cols
707                    .iter()
708                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709                    .collect::<Result<Vec<_>>>()?;
710
711                // Do not let duplicate columns to be added, some of the
712                // missing_cols may be already present but without the new
713                // projected alias.
714                missing_exprs.retain(|e| !expr.contains(e));
715                if is_distinct {
716                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717                }
718                expr.extend(missing_exprs);
719                project(Arc::unwrap_or_clone(input), expr)
720            }
721            _ => {
722                let is_distinct =
723                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724                let new_inputs = curr_plan
725                    .inputs()
726                    .into_iter()
727                    .map(|input_plan| {
728                        Self::add_missing_columns(
729                            (*input_plan).clone(),
730                            missing_cols,
731                            is_distinct,
732                        )
733                    })
734                    .collect::<Result<Vec<_>>>()?;
735                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736            }
737        }
738    }
739
740    fn ambiguous_distinct_check(
741        missing_exprs: &[Expr],
742        missing_cols: &IndexSet<Column>,
743        projection_exprs: &[Expr],
744    ) -> Result<()> {
745        if missing_exprs.is_empty() {
746            return Ok(());
747        }
748
749        // if the missing columns are all only aliases for things in
750        // the existing select list, it is ok
751        //
752        // This handles the special case for
753        // SELECT col as <alias> ORDER BY <alias>
754        //
755        // As described in https://github.com/apache/datafusion/issues/5293
756        let all_aliases = missing_exprs.iter().all(|e| {
757            projection_exprs.iter().any(|proj_expr| {
758                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759                    e == expr.as_ref()
760                } else {
761                    false
762                }
763            })
764        });
765        if all_aliases {
766            return Ok(());
767        }
768
769        let missing_col_names = missing_cols
770            .iter()
771            .map(|col| col.flat_name())
772            .collect::<String>();
773
774        plan_err!(
775            "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
776        )
777    }
778
779    /// Apply a sort by provided expressions with default direction
780    pub fn sort_by(
781        self,
782        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
783    ) -> Result<Self> {
784        self.sort(
785            expr.into_iter()
786                .map(|e| e.into().sort(true, false))
787                .collect::<Vec<SortExpr>>(),
788        )
789    }
790
791    pub fn sort(
792        self,
793        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
794    ) -> Result<Self> {
795        self.sort_with_limit(sorts, None)
796    }
797
798    /// Apply a sort
799    pub fn sort_with_limit(
800        self,
801        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
802        fetch: Option<usize>,
803    ) -> Result<Self> {
804        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
805
806        let schema = self.plan.schema();
807
808        // Collect sort columns that are missing in the input plan's schema
809        let mut missing_cols: IndexSet<Column> = IndexSet::new();
810        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
811            let columns = sort.expr.column_refs();
812
813            missing_cols.extend(
814                columns
815                    .into_iter()
816                    .filter(|c| !schema.has_column(c))
817                    .cloned(),
818            );
819
820            Ok(())
821        })?;
822
823        if missing_cols.is_empty() {
824            return Ok(Self::new(LogicalPlan::Sort(Sort {
825                expr: normalize_sorts(sorts, &self.plan)?,
826                input: self.plan,
827                fetch,
828            })));
829        }
830
831        // remove pushed down sort columns
832        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
833
834        let is_distinct = false;
835        let plan = Self::add_missing_columns(
836            Arc::unwrap_or_clone(self.plan),
837            &missing_cols,
838            is_distinct,
839        )?;
840
841        let sort_plan = LogicalPlan::Sort(Sort {
842            expr: normalize_sorts(sorts, &plan)?,
843            input: Arc::new(plan),
844            fetch,
845        });
846
847        Projection::try_new(new_expr, Arc::new(sort_plan))
848            .map(LogicalPlan::Projection)
849            .map(Self::new)
850    }
851
852    /// Apply a union, preserving duplicate rows
853    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
854        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
855    }
856
857    /// Apply a union by name, preserving duplicate rows
858    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
859        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
860    }
861
862    /// Apply a union by name, removing duplicate rows
863    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
864        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
865        let right_plan: LogicalPlan = plan;
866
867        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
868            union_by_name(left_plan, right_plan)?,
869        )))))
870    }
871
872    /// Apply a union, removing duplicate rows
873    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
874        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
875        let right_plan: LogicalPlan = plan;
876
877        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
878            union(left_plan, right_plan)?,
879        )))))
880    }
881
882    /// Apply deduplication: Only distinct (different) values are returned)
883    pub fn distinct(self) -> Result<Self> {
884        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
885    }
886
887    /// Project first values of the specified expression list according to the provided
888    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
889    pub fn distinct_on(
890        self,
891        on_expr: Vec<Expr>,
892        select_expr: Vec<Expr>,
893        sort_expr: Option<Vec<SortExpr>>,
894    ) -> Result<Self> {
895        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
896            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
897        ))))
898    }
899
900    /// Apply a join to `right` using explicitly specified columns and an
901    /// optional filter expression.
902    ///
903    /// See [`join_on`](Self::join_on) for a more concise way to specify the
904    /// join condition. Since DataFusion will automatically identify and
905    /// optimize equality predicates there is no performance difference between
906    /// this function and `join_on`
907    ///
908    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
909    /// example below), which are then combined with the optional `filter`
910    /// expression.
911    ///
912    /// Note that in case of outer join, the `filter` is applied to only matched rows.
913    pub fn join(
914        self,
915        right: LogicalPlan,
916        join_type: JoinType,
917        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
918        filter: Option<Expr>,
919    ) -> Result<Self> {
920        self.join_detailed(
921            right,
922            join_type,
923            join_keys,
924            filter,
925            NullEquality::NullEqualsNothing,
926        )
927    }
928
929    /// Apply a join using the specified expressions.
930    ///
931    /// Note that DataFusion automatically optimizes joins, including
932    /// identifying and optimizing equality predicates.
933    ///
934    /// # Example
935    ///
936    /// ```
937    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
938    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
939    /// # use std::sync::Arc;
940    /// # use arrow::datatypes::{Schema, DataType, Field};
941    /// # use datafusion_common::Result;
942    /// # fn main() -> Result<()> {
943    /// let example_schema = Arc::new(Schema::new(vec![
944    ///     Field::new("a", DataType::Int32, false),
945    ///     Field::new("b", DataType::Int32, false),
946    ///     Field::new("c", DataType::Int32, false),
947    /// ]));
948    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
949    /// let left_table = table_source.clone();
950    /// let right_table = table_source.clone();
951    ///
952    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
953    ///
954    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
955    /// let exprs = vec![
956    ///     col("left.a").eq(col("right.a")),
957    ///     col("left.b").not_eq(col("right.b")),
958    /// ];
959    ///
960    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
961    /// // finding all pairs of rows from `left` and `right` where
962    /// // where `a = a2` and `b != b2`.
963    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
964    ///     .join_on(right_plan, JoinType::Inner, exprs)?
965    ///     .build()?;
966    /// # Ok(())
967    /// # }
968    /// ```
969    pub fn join_on(
970        self,
971        right: LogicalPlan,
972        join_type: JoinType,
973        on_exprs: impl IntoIterator<Item = Expr>,
974    ) -> Result<Self> {
975        let filter = on_exprs.into_iter().reduce(Expr::and);
976
977        self.join_detailed(
978            right,
979            join_type,
980            (Vec::<Column>::new(), Vec::<Column>::new()),
981            filter,
982            NullEquality::NullEqualsNothing,
983        )
984    }
985
986    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
987        if column.relation.is_some() {
988            // column is already normalized
989            return Ok(column);
990        }
991
992        let schema = plan.schema();
993        let fallback_schemas = plan.fallback_normalize_schemas();
994        let using_columns = plan.using_columns()?;
995        column.normalize_with_schemas_and_ambiguity_check(
996            &[&[schema], &fallback_schemas],
997            &using_columns,
998        )
999    }
1000
1001    /// Apply a join with on constraint and specified null equality.
1002    ///
1003    /// The behavior is the same as [`join`](Self::join) except that it allows
1004    /// specifying the null equality behavior.
1005    ///
1006    /// The `null_equality` dictates how `null` values are joined.
1007    pub fn join_detailed(
1008        self,
1009        right: LogicalPlan,
1010        join_type: JoinType,
1011        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1012        filter: Option<Expr>,
1013        null_equality: NullEquality,
1014    ) -> Result<Self> {
1015        self.join_detailed_with_options(
1016            right,
1017            join_type,
1018            join_keys,
1019            filter,
1020            null_equality,
1021            false,
1022        )
1023    }
1024
1025    pub fn join_detailed_with_options(
1026        self,
1027        right: LogicalPlan,
1028        join_type: JoinType,
1029        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1030        filter: Option<Expr>,
1031        null_equality: NullEquality,
1032        null_aware: bool,
1033    ) -> Result<Self> {
1034        if join_keys.0.len() != join_keys.1.len() {
1035            return plan_err!("left_keys and right_keys were not the same length");
1036        }
1037
1038        let filter = if let Some(expr) = filter {
1039            let filter = normalize_col_with_schemas_and_ambiguity_check(
1040                expr,
1041                &[&[self.schema(), right.schema()]],
1042                &[],
1043            )?;
1044            Some(filter)
1045        } else {
1046            None
1047        };
1048
1049        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1050            join_keys
1051                .0
1052                .into_iter()
1053                .zip(join_keys.1)
1054                .map(|(l, r)| {
1055                    let l = l.into();
1056                    let r = r.into();
1057
1058                    match (&l.relation, &r.relation) {
1059                        (Some(lr), Some(rr)) => {
1060                            let l_is_left =
1061                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1062                            let l_is_right =
1063                                right.schema().field_with_qualified_name(lr, &l.name);
1064                            let r_is_left =
1065                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1066                            let r_is_right =
1067                                right.schema().field_with_qualified_name(rr, &r.name);
1068
1069                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1070                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1071                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1072                                _ => (
1073                                    Self::normalize(&self.plan, l),
1074                                    Self::normalize(&right, r),
1075                                ),
1076                            }
1077                        }
1078                        (Some(lr), None) => {
1079                            let l_is_left =
1080                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1081                            let l_is_right =
1082                                right.schema().field_with_qualified_name(lr, &l.name);
1083
1084                            match (l_is_left, l_is_right) {
1085                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1086                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1087                                _ => (
1088                                    Self::normalize(&self.plan, l),
1089                                    Self::normalize(&right, r),
1090                                ),
1091                            }
1092                        }
1093                        (None, Some(rr)) => {
1094                            let r_is_left =
1095                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1096                            let r_is_right =
1097                                right.schema().field_with_qualified_name(rr, &r.name);
1098
1099                            match (r_is_left, r_is_right) {
1100                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1101                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1102                                _ => (
1103                                    Self::normalize(&self.plan, l),
1104                                    Self::normalize(&right, r),
1105                                ),
1106                            }
1107                        }
1108                        (None, None) => {
1109                            let mut swap = false;
1110                            let left_key = Self::normalize(&self.plan, l.clone())
1111                                .or_else(|_| {
1112                                    swap = true;
1113                                    Self::normalize(&right, l)
1114                                });
1115                            if swap {
1116                                (Self::normalize(&self.plan, r), left_key)
1117                            } else {
1118                                (left_key, Self::normalize(&right, r))
1119                            }
1120                        }
1121                    }
1122                })
1123                .unzip();
1124
1125        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1126        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1127
1128        let on: Vec<_> = left_keys
1129            .into_iter()
1130            .zip(right_keys)
1131            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1132            .collect();
1133        let join_schema =
1134            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1135
1136        // Inner type without join condition is cross join
1137        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1138            return plan_err!("join condition should not be empty");
1139        }
1140
1141        Ok(Self::new(LogicalPlan::Join(Join {
1142            left: self.plan,
1143            right: Arc::new(right),
1144            on,
1145            filter,
1146            join_type,
1147            join_constraint: JoinConstraint::On,
1148            schema: DFSchemaRef::new(join_schema),
1149            null_equality,
1150            null_aware,
1151        })))
1152    }
1153
1154    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1155    pub fn join_using(
1156        self,
1157        right: LogicalPlan,
1158        join_type: JoinType,
1159        using_keys: Vec<Column>,
1160    ) -> Result<Self> {
1161        let left_keys: Vec<Column> = using_keys
1162            .clone()
1163            .into_iter()
1164            .map(|c| Self::normalize(&self.plan, c))
1165            .collect::<Result<_>>()?;
1166        let right_keys: Vec<Column> = using_keys
1167            .into_iter()
1168            .map(|c| Self::normalize(&right, c))
1169            .collect::<Result<_>>()?;
1170
1171        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1172        let mut join_on: Vec<(Expr, Expr)> = vec![];
1173        let mut filters: Option<Expr> = None;
1174        for (l, r) in &on {
1175            if self.plan.schema().has_column(l)
1176                && right.schema().has_column(r)
1177                && can_hash(
1178                    datafusion_common::ExprSchema::field_from_column(
1179                        self.plan.schema(),
1180                        l,
1181                    )?
1182                    .data_type(),
1183                )
1184            {
1185                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1186            } else if self.plan.schema().has_column(l)
1187                && right.schema().has_column(r)
1188                && can_hash(
1189                    datafusion_common::ExprSchema::field_from_column(
1190                        self.plan.schema(),
1191                        r,
1192                    )?
1193                    .data_type(),
1194                )
1195            {
1196                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1197            } else {
1198                let expr = binary_expr(
1199                    Expr::Column(l.clone()),
1200                    Operator::Eq,
1201                    Expr::Column(r.clone()),
1202                );
1203                match filters {
1204                    None => filters = Some(expr),
1205                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1206                }
1207            }
1208        }
1209
1210        if join_on.is_empty() {
1211            let join = Self::from(self.plan).cross_join(right)?;
1212            join.filter(filters.ok_or_else(|| {
1213                internal_datafusion_err!("filters should not be None here")
1214            })?)
1215        } else {
1216            let join = Join::try_new(
1217                self.plan,
1218                Arc::new(right),
1219                join_on,
1220                filters,
1221                join_type,
1222                JoinConstraint::Using,
1223                NullEquality::NullEqualsNothing,
1224                false, // null_aware
1225            )?;
1226
1227            Ok(Self::new(LogicalPlan::Join(join)))
1228        }
1229    }
1230
1231    /// Apply a cross join
1232    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1233        let join = Join::try_new(
1234            self.plan,
1235            Arc::new(right),
1236            vec![],
1237            None,
1238            JoinType::Inner,
1239            JoinConstraint::On,
1240            NullEquality::NullEqualsNothing,
1241            false, // null_aware
1242        )?;
1243
1244        Ok(Self::new(LogicalPlan::Join(join)))
1245    }
1246
1247    /// Repartition
1248    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1249        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1250            input: self.plan,
1251            partitioning_scheme,
1252        })))
1253    }
1254
1255    /// Apply a window functions to extend the schema
1256    pub fn window(
1257        self,
1258        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1259    ) -> Result<Self> {
1260        let window_expr = normalize_cols(window_expr, &self.plan)?;
1261        validate_unique_names("Windows", &window_expr)?;
1262        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1263            window_expr,
1264            self.plan,
1265        )?)))
1266    }
1267
1268    /// Apply an aggregate: grouping on the `group_expr` expressions
1269    /// and calculating `aggr_expr` aggregates for each distinct
1270    /// value of the `group_expr`;
1271    pub fn aggregate(
1272        self,
1273        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1274        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1275    ) -> Result<Self> {
1276        let group_expr = normalize_cols(group_expr, &self.plan)?;
1277        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1278
1279        let group_expr = if self.options.add_implicit_group_by_exprs {
1280            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1281        } else {
1282            group_expr
1283        };
1284
1285        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1286            .map(LogicalPlan::Aggregate)
1287            .map(Self::new)
1288    }
1289
1290    /// Create an expression to represent the explanation of the plan
1291    ///
1292    /// if `analyze` is true, runs the actual plan and produces
1293    /// information about metrics during run.
1294    ///
1295    /// if `verbose` is true, prints out additional details.
1296    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1297        // Keep the format default to Indent
1298        self.explain_option_format(
1299            ExplainOption::default()
1300                .with_verbose(verbose)
1301                .with_analyze(analyze),
1302        )
1303    }
1304
1305    /// Create an expression to represent the explanation of the plan
1306    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1307    /// Details see [`ExplainOption`].
1308    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1309        let schema = LogicalPlan::explain_schema();
1310        let schema = schema.to_dfschema_ref()?;
1311
1312        if explain_option.analyze {
1313            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1314                verbose: explain_option.verbose,
1315                input: self.plan,
1316                schema,
1317            })))
1318        } else {
1319            let stringified_plans =
1320                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1321
1322            Ok(Self::new(LogicalPlan::Explain(Explain {
1323                verbose: explain_option.verbose,
1324                plan: self.plan,
1325                explain_format: explain_option.format,
1326                stringified_plans,
1327                schema,
1328                logical_optimization_succeeded: false,
1329            })))
1330        }
1331    }
1332
1333    /// Process intersect set operator
1334    pub fn intersect(
1335        left_plan: LogicalPlan,
1336        right_plan: LogicalPlan,
1337        is_all: bool,
1338    ) -> Result<LogicalPlan> {
1339        LogicalPlanBuilder::intersect_or_except(
1340            left_plan,
1341            right_plan,
1342            JoinType::LeftSemi,
1343            is_all,
1344        )
1345    }
1346
1347    /// Process except set operator
1348    pub fn except(
1349        left_plan: LogicalPlan,
1350        right_plan: LogicalPlan,
1351        is_all: bool,
1352    ) -> Result<LogicalPlan> {
1353        LogicalPlanBuilder::intersect_or_except(
1354            left_plan,
1355            right_plan,
1356            JoinType::LeftAnti,
1357            is_all,
1358        )
1359    }
1360
1361    /// Process intersect or except
1362    fn intersect_or_except(
1363        left_plan: LogicalPlan,
1364        right_plan: LogicalPlan,
1365        join_type: JoinType,
1366        is_all: bool,
1367    ) -> Result<LogicalPlan> {
1368        let left_len = left_plan.schema().fields().len();
1369        let right_len = right_plan.schema().fields().len();
1370
1371        if left_len != right_len {
1372            return plan_err!(
1373                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1374            );
1375        }
1376
1377        // Requalify sides if needed to avoid duplicate qualified field names
1378        // (e.g., when both sides reference the same table)
1379        let left_builder = LogicalPlanBuilder::from(left_plan);
1380        let right_builder = LogicalPlanBuilder::from(right_plan);
1381        let (left_builder, right_builder, _requalified) =
1382            requalify_sides_if_needed(left_builder, right_builder)?;
1383        let left_plan = left_builder.build()?;
1384        let right_plan = right_builder.build()?;
1385
1386        let join_keys = left_plan
1387            .schema()
1388            .fields()
1389            .iter()
1390            .zip(right_plan.schema().fields().iter())
1391            .map(|(left_field, right_field)| {
1392                (
1393                    (Column::from_name(left_field.name())),
1394                    (Column::from_name(right_field.name())),
1395                )
1396            })
1397            .unzip();
1398        if is_all {
1399            LogicalPlanBuilder::from(left_plan)
1400                .join_detailed(
1401                    right_plan,
1402                    join_type,
1403                    join_keys,
1404                    None,
1405                    NullEquality::NullEqualsNull,
1406                )?
1407                .build()
1408        } else {
1409            LogicalPlanBuilder::from(left_plan)
1410                .distinct()?
1411                .join_detailed(
1412                    right_plan,
1413                    join_type,
1414                    join_keys,
1415                    None,
1416                    NullEquality::NullEqualsNull,
1417                )?
1418                .build()
1419        }
1420    }
1421
1422    /// Build the plan
1423    pub fn build(self) -> Result<LogicalPlan> {
1424        Ok(Arc::unwrap_or_clone(self.plan))
1425    }
1426
1427    /// Apply a join with both explicit equijoin and non equijoin predicates.
1428    ///
1429    /// Note this is a low level API that requires identifying specific
1430    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1431    /// automatically identifies predicates appropriately.
1432    ///
1433    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1434    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1435    /// to columns from the existing input. `r`, the second element of the tuple,
1436    /// must only refer to columns from the right input.
1437    ///
1438    /// `filter` contains any other filter expression to apply during the
1439    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1440    /// than the filter expressions, so they are preferred.
1441    pub fn join_with_expr_keys(
1442        self,
1443        right: LogicalPlan,
1444        join_type: JoinType,
1445        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1446        filter: Option<Expr>,
1447    ) -> Result<Self> {
1448        if equi_exprs.0.len() != equi_exprs.1.len() {
1449            return plan_err!("left_keys and right_keys were not the same length");
1450        }
1451
1452        let join_key_pairs = equi_exprs
1453            .0
1454            .into_iter()
1455            .zip(equi_exprs.1)
1456            .map(|(l, r)| {
1457                let left_key = l.into();
1458                let right_key = r.into();
1459                let mut left_using_columns  = HashSet::new();
1460                expr_to_columns(&left_key, &mut left_using_columns)?;
1461                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1462                    left_key,
1463                    &[&[self.plan.schema()]],
1464                    &[],
1465                )?;
1466
1467                let mut right_using_columns = HashSet::new();
1468                expr_to_columns(&right_key, &mut right_using_columns)?;
1469                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1470                    right_key,
1471                    &[&[right.schema()]],
1472                    &[],
1473                )?;
1474
1475                // find valid equijoin
1476                find_valid_equijoin_key_pair(
1477                        &normalized_left_key,
1478                        &normalized_right_key,
1479                        self.plan.schema(),
1480                        right.schema(),
1481                    )?.ok_or_else(||
1482                        plan_datafusion_err!(
1483                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1484                        ))
1485            })
1486            .collect::<Result<Vec<_>>>()?;
1487
1488        let join = Join::try_new(
1489            self.plan,
1490            Arc::new(right),
1491            join_key_pairs,
1492            filter,
1493            join_type,
1494            JoinConstraint::On,
1495            NullEquality::NullEqualsNothing,
1496            false, // null_aware
1497        )?;
1498
1499        Ok(Self::new(LogicalPlan::Join(join)))
1500    }
1501
1502    /// Unnest the given column.
1503    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1504        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1505    }
1506
1507    /// Unnest the given column given [`UnnestOptions`]
1508    pub fn unnest_column_with_options(
1509        self,
1510        column: impl Into<Column>,
1511        options: UnnestOptions,
1512    ) -> Result<Self> {
1513        unnest_with_options(
1514            Arc::unwrap_or_clone(self.plan),
1515            vec![column.into()],
1516            options,
1517        )
1518        .map(Self::new)
1519    }
1520
1521    /// Unnest the given columns with the given [`UnnestOptions`]
1522    pub fn unnest_columns_with_options(
1523        self,
1524        columns: Vec<Column>,
1525        options: UnnestOptions,
1526    ) -> Result<Self> {
1527        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1528            .map(Self::new)
1529    }
1530}
1531
1532impl From<LogicalPlan> for LogicalPlanBuilder {
1533    fn from(plan: LogicalPlan) -> Self {
1534        LogicalPlanBuilder::new(plan)
1535    }
1536}
1537
1538impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1539    fn from(plan: Arc<LogicalPlan>) -> Self {
1540        LogicalPlanBuilder::new_from_arc(plan)
1541    }
1542}
1543
1544/// Container used when building fields for a `VALUES` node.
1545#[derive(Default)]
1546struct ValuesFields {
1547    inner: Vec<Field>,
1548}
1549
1550impl ValuesFields {
1551    pub fn new() -> Self {
1552        Self::default()
1553    }
1554
1555    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1556        self.push_with_metadata(data_type, nullable, None);
1557    }
1558
1559    pub fn push_with_metadata(
1560        &mut self,
1561        data_type: DataType,
1562        nullable: bool,
1563        metadata: Option<FieldMetadata>,
1564    ) {
1565        // Naming follows the convention described here:
1566        // https://www.postgresql.org/docs/current/queries-values.html
1567        let name = format!("column{}", self.inner.len() + 1);
1568        let mut field = Field::new(name, data_type, nullable);
1569        if let Some(metadata) = metadata {
1570            field.set_metadata(metadata.to_hashmap());
1571        }
1572        self.inner.push(field);
1573    }
1574
1575    pub fn into_fields(self) -> Fields {
1576        self.inner.into()
1577    }
1578}
1579
1580/// Returns aliases to make field names unique.
1581///
1582/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1583/// `Some(alias)` means rename to the alias to ensure uniqueness.
1584///
1585/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1586/// to maintain unique column names.
1587///
1588/// # Example
1589/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1590/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1591pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1592    // Some field names might already come to this function with the count (number of times it appeared)
1593    // as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1594    // if these three fields passed to this function: "col:1", "col" and "col", the function
1595    // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1596    // That's why we need the `seen` set, so the fields are always unique.
1597
1598    // Tracks a mapping between a field name and the number of appearances of that field.
1599    let mut name_map = HashMap::<&str, usize>::new();
1600    // Tracks all the fields and aliases that were previously seen.
1601    let mut seen = HashSet::<Cow<String>>::new();
1602
1603    fields
1604        .iter()
1605        .map(|field| {
1606            let original_name = field.name();
1607            let mut name = Cow::Borrowed(original_name);
1608
1609            let count = name_map.entry(original_name).or_insert(0);
1610
1611            // Loop until we find a name that hasn't been used.
1612            while seen.contains(&name) {
1613                *count += 1;
1614                name = Cow::Owned(format!("{original_name}:{count}"));
1615            }
1616
1617            seen.insert(name.clone());
1618
1619            match name {
1620                Cow::Borrowed(_) => None,
1621                Cow::Owned(alias) => Some(alias),
1622            }
1623        })
1624        .collect()
1625}
1626
1627fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1628    let mut table_references = schema
1629        .iter()
1630        .filter_map(|(qualifier, _)| qualifier)
1631        .collect::<Vec<_>>();
1632    table_references.dedup();
1633    let table_reference = if table_references.len() == 1 {
1634        table_references.pop().cloned()
1635    } else {
1636        None
1637    };
1638
1639    (
1640        table_reference,
1641        Arc::new(Field::new("mark", DataType::Boolean, false)),
1642    )
1643}
1644
1645/// Creates a schema for a join operation.
1646/// The fields from the left side are first
1647pub fn build_join_schema(
1648    left: &DFSchema,
1649    right: &DFSchema,
1650    join_type: &JoinType,
1651) -> Result<DFSchema> {
1652    fn nullify_fields<'a>(
1653        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1654    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1655        fields
1656            .map(|(q, f)| {
1657                // TODO: find a good way to do that
1658                let field = f.as_ref().clone().with_nullable(true);
1659                (q.cloned(), Arc::new(field))
1660            })
1661            .collect()
1662    }
1663
1664    let right_fields = right.iter();
1665    let left_fields = left.iter();
1666
1667    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1668        JoinType::Inner => {
1669            // left then right
1670            let left_fields = left_fields
1671                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1672                .collect::<Vec<_>>();
1673            let right_fields = right_fields
1674                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1675                .collect::<Vec<_>>();
1676            left_fields.into_iter().chain(right_fields).collect()
1677        }
1678        JoinType::Left => {
1679            // left then right, right set to nullable in case of not matched scenario
1680            let left_fields = left_fields
1681                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1682                .collect::<Vec<_>>();
1683            left_fields
1684                .into_iter()
1685                .chain(nullify_fields(right_fields))
1686                .collect()
1687        }
1688        JoinType::Right => {
1689            // left then right, left set to nullable in case of not matched scenario
1690            let right_fields = right_fields
1691                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1692                .collect::<Vec<_>>();
1693            nullify_fields(left_fields)
1694                .into_iter()
1695                .chain(right_fields)
1696                .collect()
1697        }
1698        JoinType::Full => {
1699            // left then right, all set to nullable in case of not matched scenario
1700            nullify_fields(left_fields)
1701                .into_iter()
1702                .chain(nullify_fields(right_fields))
1703                .collect()
1704        }
1705        JoinType::LeftSemi | JoinType::LeftAnti => {
1706            // Only use the left side for the schema
1707            left_fields
1708                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1709                .collect()
1710        }
1711        JoinType::LeftMark => left_fields
1712            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1713            .chain(once(mark_field(right)))
1714            .collect(),
1715        JoinType::RightSemi | JoinType::RightAnti => {
1716            // Only use the right side for the schema
1717            right_fields
1718                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1719                .collect()
1720        }
1721        JoinType::RightMark => right_fields
1722            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1723            .chain(once(mark_field(left)))
1724            .collect(),
1725    };
1726    let func_dependencies = left.functional_dependencies().join(
1727        right.functional_dependencies(),
1728        join_type,
1729        left.fields().len(),
1730    );
1731
1732    let (schema1, schema2) = match join_type {
1733        JoinType::Right
1734        | JoinType::RightSemi
1735        | JoinType::RightAnti
1736        | JoinType::RightMark => (left, right),
1737        _ => (right, left),
1738    };
1739
1740    let metadata = schema1
1741        .metadata()
1742        .clone()
1743        .into_iter()
1744        .chain(schema2.metadata().clone())
1745        .collect();
1746
1747    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1748    dfschema.with_functional_dependencies(func_dependencies)
1749}
1750
1751/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1752/// conflict with the columns from the other.
1753/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1754/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1755/// places (see e.g. DFSchema::check_names).
1756/// The function returns:
1757/// - The requalified or original left logical plan
1758/// - The requalified or original right logical plan
1759/// - If a requalification was needed or not
1760pub fn requalify_sides_if_needed(
1761    left: LogicalPlanBuilder,
1762    right: LogicalPlanBuilder,
1763) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1764    let left_cols = left.schema().columns();
1765    let right_cols = right.schema().columns();
1766
1767    // Requalify if merging the schemas would cause an error during join.
1768    // This can happen in several cases:
1769    // 1. Duplicate qualified fields: both sides have same relation.name
1770    // 2. Duplicate unqualified fields: both sides have same unqualified name
1771    // 3. Ambiguous reference: one side qualified, other unqualified, same name
1772    //
1773    // Implementation note: This uses a simple O(n*m) nested loop rather than
1774    // a HashMap-based O(n+m) approach. The nested loop is preferred because:
1775    // - Schemas are typically small (in TPCH benchmark, max is 16 columns),
1776    //   so n*m is negligible
1777    // - Early return on first conflict makes common case very fast
1778    // - Code is simpler and easier to reason about
1779    // - Called only during plan construction, not in execution hot path
1780    for l in &left_cols {
1781        for r in &right_cols {
1782            if l.name != r.name {
1783                continue;
1784            }
1785
1786            // Same name - check if this would cause a conflict
1787            match (&l.relation, &r.relation) {
1788                // Both qualified with same relation - duplicate qualified field
1789                (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1790                    return Ok((
1791                        left.alias(TableReference::bare("left"))?,
1792                        right.alias(TableReference::bare("right"))?,
1793                        true,
1794                    ));
1795                }
1796                // Both unqualified - duplicate unqualified field
1797                (None, None) => {
1798                    return Ok((
1799                        left.alias(TableReference::bare("left"))?,
1800                        right.alias(TableReference::bare("right"))?,
1801                        true,
1802                    ));
1803                }
1804                // One qualified, one not - ambiguous reference
1805                (Some(_), None) | (None, Some(_)) => {
1806                    return Ok((
1807                        left.alias(TableReference::bare("left"))?,
1808                        right.alias(TableReference::bare("right"))?,
1809                        true,
1810                    ));
1811                }
1812                // Different qualifiers - OK, no conflict
1813                _ => {}
1814            }
1815        }
1816    }
1817
1818    // No conflicts found
1819    Ok((left, right, false))
1820}
1821/// Add additional "synthetic" group by expressions based on functional
1822/// dependencies.
1823///
1824/// For example, if we are grouping on `[c1]`, and we know from
1825/// functional dependencies that column `c1` determines `c2`, this function
1826/// adds `c2` to the group by list.
1827///
1828/// This allows MySQL style selects like
1829/// `SELECT col FROM t WHERE pk = 5` if col is unique
1830pub fn add_group_by_exprs_from_dependencies(
1831    mut group_expr: Vec<Expr>,
1832    schema: &DFSchemaRef,
1833) -> Result<Vec<Expr>> {
1834    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1835    // c1 + 1` produces an output field named `"c1 + 1"`
1836    let mut group_by_field_names = group_expr
1837        .iter()
1838        .map(|e| e.schema_name().to_string())
1839        .collect::<Vec<_>>();
1840
1841    if let Some(target_indices) =
1842        get_target_functional_dependencies(schema, &group_by_field_names)
1843    {
1844        for idx in target_indices {
1845            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1846            let expr_name = expr.schema_name().to_string();
1847            if !group_by_field_names.contains(&expr_name) {
1848                group_by_field_names.push(expr_name);
1849                group_expr.push(expr);
1850            }
1851        }
1852    }
1853    Ok(group_expr)
1854}
1855
1856/// Errors if one or more expressions have equal names.
1857pub fn validate_unique_names<'a>(
1858    node_name: &str,
1859    expressions: impl IntoIterator<Item = &'a Expr>,
1860) -> Result<()> {
1861    let mut unique_names = HashMap::new();
1862
1863    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1864        let name = expr.schema_name().to_string();
1865        match unique_names.get(&name) {
1866            None => {
1867                unique_names.insert(name, (position, expr));
1868                Ok(())
1869            },
1870            Some((existing_position, existing_expr)) => {
1871                plan_err!("{node_name} require unique expression names \
1872                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1873                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1874                            )
1875            }
1876        }
1877    })
1878}
1879
1880/// Union two [`LogicalPlan`]s.
1881///
1882/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1883/// subtree expressions will not be properly typed until the optimizer pass.
1884///
1885/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1886/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1887/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1888///
1889/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1890/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1891pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1892    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1893        Arc::new(left_plan),
1894        Arc::new(right_plan),
1895    ])?))
1896}
1897
1898/// Like [`union`], but combine rows from different tables by name, rather than
1899/// by position.
1900pub fn union_by_name(
1901    left_plan: LogicalPlan,
1902    right_plan: LogicalPlan,
1903) -> Result<LogicalPlan> {
1904    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1905        Arc::new(left_plan),
1906        Arc::new(right_plan),
1907    ])?))
1908}
1909
1910/// Create Projection
1911/// # Errors
1912/// This function errors under any of the following conditions:
1913/// * Two or more expressions have the same name
1914/// * An invalid expression is used (e.g. a `sort` expression)
1915pub fn project(
1916    plan: LogicalPlan,
1917    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1918) -> Result<LogicalPlan> {
1919    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1920}
1921
1922/// Create Projection. Similar to project except that the expressions
1923/// passed in have a flag to indicate if that expression requires
1924/// validation (normalize & columnize) (true) or not (false)
1925/// # Errors
1926/// This function errors under any of the following conditions:
1927/// * Two or more expressions have the same name
1928/// * An invalid expression is used (e.g. a `sort` expression)
1929fn project_with_validation(
1930    plan: LogicalPlan,
1931    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1932) -> Result<LogicalPlan> {
1933    let mut projected_expr = vec![];
1934    for (e, validate) in expr {
1935        let e = e.into();
1936        match e {
1937            SelectExpr::Wildcard(opt) => {
1938                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1939
1940                // If there is a REPLACE statement, replace that column with the given
1941                // replace expression. Column name remains the same.
1942                let expanded = if let Some(replace) = opt.replace {
1943                    replace_columns(expanded, &replace)?
1944                } else {
1945                    expanded
1946                };
1947
1948                for e in expanded {
1949                    if validate {
1950                        projected_expr
1951                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1952                    } else {
1953                        projected_expr.push(e)
1954                    }
1955                }
1956            }
1957            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1958                let expanded =
1959                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1960
1961                // If there is a REPLACE statement, replace that column with the given
1962                // replace expression. Column name remains the same.
1963                let expanded = if let Some(replace) = opt.replace {
1964                    replace_columns(expanded, &replace)?
1965                } else {
1966                    expanded
1967                };
1968
1969                for e in expanded {
1970                    if validate {
1971                        projected_expr
1972                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1973                    } else {
1974                        projected_expr.push(e)
1975                    }
1976                }
1977            }
1978            SelectExpr::Expression(e) => {
1979                if validate {
1980                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1981                } else {
1982                    projected_expr.push(e)
1983                }
1984            }
1985        }
1986    }
1987    validate_unique_names("Projections", projected_expr.iter())?;
1988
1989    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1990}
1991
1992/// If there is a REPLACE statement in the projected expression in the form of
1993/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1994/// that column with the given replace expression. Column name remains the same.
1995/// Multiple REPLACEs are also possible with comma separations.
1996fn replace_columns(
1997    mut exprs: Vec<Expr>,
1998    replace: &PlannedReplaceSelectItem,
1999) -> Result<Vec<Expr>> {
2000    for expr in exprs.iter_mut() {
2001        if let Expr::Column(Column { name, .. }) = expr
2002            && let Some((_, new_expr)) = replace
2003                .items()
2004                .iter()
2005                .zip(replace.expressions().iter())
2006                .find(|(item, _)| item.column_name.value == *name)
2007        {
2008            *expr = new_expr.clone().alias(name.clone())
2009        }
2010    }
2011    Ok(exprs)
2012}
2013
2014/// Create a SubqueryAlias to wrap a LogicalPlan.
2015pub fn subquery_alias(
2016    plan: LogicalPlan,
2017    alias: impl Into<TableReference>,
2018) -> Result<LogicalPlan> {
2019    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
2020}
2021
2022/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
2023/// This is mostly used for testing and documentation.
2024pub fn table_scan(
2025    name: Option<impl Into<TableReference>>,
2026    table_schema: &Schema,
2027    projection: Option<Vec<usize>>,
2028) -> Result<LogicalPlanBuilder> {
2029    table_scan_with_filters(name, table_schema, projection, vec![])
2030}
2031
2032/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2033/// and inlined filters.
2034/// This is mostly used for testing and documentation.
2035pub fn table_scan_with_filters(
2036    name: Option<impl Into<TableReference>>,
2037    table_schema: &Schema,
2038    projection: Option<Vec<usize>>,
2039    filters: Vec<Expr>,
2040) -> Result<LogicalPlanBuilder> {
2041    let table_source = table_source(table_schema);
2042    let name = name
2043        .map(|n| n.into())
2044        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2045    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2046}
2047
2048/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
2049/// filters, and inlined fetch.
2050/// This is mostly used for testing and documentation.
2051pub fn table_scan_with_filter_and_fetch(
2052    name: Option<impl Into<TableReference>>,
2053    table_schema: &Schema,
2054    projection: Option<Vec<usize>>,
2055    filters: Vec<Expr>,
2056    fetch: Option<usize>,
2057) -> Result<LogicalPlanBuilder> {
2058    let table_source = table_source(table_schema);
2059    let name = name
2060        .map(|n| n.into())
2061        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2062    LogicalPlanBuilder::scan_with_filters_fetch(
2063        name,
2064        table_source,
2065        projection,
2066        filters,
2067        fetch,
2068    )
2069}
2070
2071pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2072    // TODO should we take SchemaRef and avoid cloning?
2073    let table_schema = Arc::new(table_schema.clone());
2074    Arc::new(LogicalTableSource {
2075        table_schema,
2076        constraints: Default::default(),
2077    })
2078}
2079
2080pub fn table_source_with_constraints(
2081    table_schema: &Schema,
2082    constraints: Constraints,
2083) -> Arc<dyn TableSource> {
2084    // TODO should we take SchemaRef and avoid cloning?
2085    let table_schema = Arc::new(table_schema.clone());
2086    Arc::new(LogicalTableSource {
2087        table_schema,
2088        constraints,
2089    })
2090}
2091
2092/// Wrap projection for a plan, if the join keys contains normal expression.
2093pub fn wrap_projection_for_join_if_necessary(
2094    join_keys: &[Expr],
2095    input: LogicalPlan,
2096) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2097    let input_schema = input.schema();
2098    let alias_join_keys: Vec<Expr> = join_keys
2099        .iter()
2100        .map(|key| {
2101            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
2102            // If we do not add alias, it will throw same field name error in the schema when adding projection.
2103            // For example:
2104            //    input scan : [a, b, c],
2105            //    join keys: [cast(a as int)]
2106            //
2107            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
2108            //  https://github.com/apache/datafusion/issues/4478
2109            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2110                let alias = format!("{key}");
2111                key.clone().alias(alias)
2112            } else {
2113                key.clone()
2114            }
2115        })
2116        .collect::<Vec<_>>();
2117
2118    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2119    let plan = if need_project {
2120        // Include all columns from the input and extend them with the join keys
2121        let mut projection = input_schema
2122            .columns()
2123            .into_iter()
2124            .map(Expr::Column)
2125            .collect::<Vec<_>>();
2126        let join_key_items = alias_join_keys
2127            .iter()
2128            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2129            .cloned()
2130            .collect::<HashSet<Expr>>();
2131        projection.extend(join_key_items);
2132
2133        LogicalPlanBuilder::from(input)
2134            .project(projection.into_iter().map(SelectExpr::from))?
2135            .build()?
2136    } else {
2137        input
2138    };
2139
2140    let join_on = alias_join_keys
2141        .into_iter()
2142        .map(|key| {
2143            if let Some(col) = key.try_as_col() {
2144                Ok(col.clone())
2145            } else {
2146                let name = key.schema_name().to_string();
2147                Ok(Column::from_name(name))
2148            }
2149        })
2150        .collect::<Result<Vec<_>>>()?;
2151
2152    Ok((plan, join_on, need_project))
2153}
2154
2155/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2156/// that users will provide their own TableSource implementations or use DataFusion's
2157/// DefaultTableSource.
2158pub struct LogicalTableSource {
2159    table_schema: SchemaRef,
2160    constraints: Constraints,
2161}
2162
2163impl LogicalTableSource {
2164    /// Create a new LogicalTableSource
2165    pub fn new(table_schema: SchemaRef) -> Self {
2166        Self {
2167            table_schema,
2168            constraints: Constraints::default(),
2169        }
2170    }
2171
2172    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2173        self.constraints = constraints;
2174        self
2175    }
2176}
2177
2178impl TableSource for LogicalTableSource {
2179    fn as_any(&self) -> &dyn Any {
2180        self
2181    }
2182
2183    fn schema(&self) -> SchemaRef {
2184        Arc::clone(&self.table_schema)
2185    }
2186
2187    fn constraints(&self) -> Option<&Constraints> {
2188        Some(&self.constraints)
2189    }
2190
2191    fn supports_filters_pushdown(
2192        &self,
2193        filters: &[&Expr],
2194    ) -> Result<Vec<TableProviderFilterPushDown>> {
2195        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2196    }
2197}
2198
2199/// Create a [`LogicalPlan::Unnest`] plan
2200pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2201    unnest_with_options(input, columns, UnnestOptions::default())
2202}
2203
2204pub fn get_struct_unnested_columns(
2205    col_name: &String,
2206    inner_fields: &Fields,
2207) -> Vec<Column> {
2208    inner_fields
2209        .iter()
2210        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2211        .collect()
2212}
2213
2214/// Create a [`LogicalPlan::Unnest`] plan with options
2215/// This function receive a list of columns to be unnested
2216/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2217/// The new schema will contains post-unnest fields replacing the original field
2218///
2219/// For example:
2220/// Input schema as
2221/// ```text
2222/// +---------------------+-------------------+
2223/// | col1                | col2              |
2224/// +---------------------+-------------------+
2225/// | Struct(INT64,INT32) | List(List(Int64)) |
2226/// +---------------------+-------------------+
2227/// ```
2228///
2229///
2230///
2231/// Then unnesting columns with:
2232/// - (col1,Struct)
2233/// - (col2,List(\[depth=1,depth=2\]))
2234///
2235/// will generate a new schema as
2236/// ```text
2237/// +---------+---------+---------------------+---------------------+
2238/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2239/// +---------+---------+---------------------+---------------------+
2240/// | Int64   | Int32   | List(Int64)         |  Int64              |
2241/// +---------+---------+---------------------+---------------------+
2242/// ```
2243pub fn unnest_with_options(
2244    input: LogicalPlan,
2245    columns_to_unnest: Vec<Column>,
2246    options: UnnestOptions,
2247) -> Result<LogicalPlan> {
2248    Ok(LogicalPlan::Unnest(Unnest::try_new(
2249        Arc::new(input),
2250        columns_to_unnest,
2251        options,
2252    )?))
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257    use std::vec;
2258
2259    use super::*;
2260    use crate::lit_with_metadata;
2261    use crate::logical_plan::StringifiedPlan;
2262    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2263
2264    use crate::test::function_stub::sum;
2265    use datafusion_common::{
2266        Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2267    };
2268    use insta::assert_snapshot;
2269
2270    #[test]
2271    fn plan_builder_simple() -> Result<()> {
2272        let plan =
2273            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2274                .filter(col("state").eq(lit("CO")))?
2275                .project(vec![col("id")])?
2276                .build()?;
2277
2278        assert_snapshot!(plan, @r#"
2279        Projection: employee_csv.id
2280          Filter: employee_csv.state = Utf8("CO")
2281            TableScan: employee_csv projection=[id, state]
2282        "#);
2283
2284        Ok(())
2285    }
2286
2287    #[test]
2288    fn plan_builder_schema() {
2289        let schema = employee_schema();
2290        let projection = None;
2291        let plan =
2292            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2293                .unwrap();
2294        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2295
2296        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2297        // (and thus normalized to "employee"csv") as well
2298        let projection = None;
2299        let plan =
2300            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2301                .unwrap();
2302        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2303    }
2304
2305    #[test]
2306    fn plan_builder_empty_name() {
2307        let schema = employee_schema();
2308        let projection = None;
2309        let err =
2310            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2311        assert_snapshot!(
2312            err.strip_backtrace(),
2313            @"Error during planning: table_name cannot be empty"
2314        );
2315    }
2316
2317    #[test]
2318    fn plan_builder_sort() -> Result<()> {
2319        let plan =
2320            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2321                .sort(vec![
2322                    expr::Sort::new(col("state"), true, true),
2323                    expr::Sort::new(col("salary"), false, false),
2324                ])?
2325                .build()?;
2326
2327        assert_snapshot!(plan, @r"
2328        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2329          TableScan: employee_csv projection=[state, salary]
2330        ");
2331
2332        Ok(())
2333    }
2334
2335    #[test]
2336    fn plan_builder_union() -> Result<()> {
2337        let plan =
2338            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2339
2340        let plan = plan
2341            .clone()
2342            .union(plan.clone().build()?)?
2343            .union(plan.clone().build()?)?
2344            .union(plan.build()?)?
2345            .build()?;
2346
2347        assert_snapshot!(plan, @r"
2348        Union
2349          Union
2350            Union
2351              TableScan: employee_csv projection=[state, salary]
2352              TableScan: employee_csv projection=[state, salary]
2353            TableScan: employee_csv projection=[state, salary]
2354          TableScan: employee_csv projection=[state, salary]
2355        ");
2356
2357        Ok(())
2358    }
2359
2360    #[test]
2361    fn plan_builder_union_distinct() -> Result<()> {
2362        let plan =
2363            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2364
2365        let plan = plan
2366            .clone()
2367            .union_distinct(plan.clone().build()?)?
2368            .union_distinct(plan.clone().build()?)?
2369            .union_distinct(plan.build()?)?
2370            .build()?;
2371
2372        assert_snapshot!(plan, @r"
2373        Distinct:
2374          Union
2375            Distinct:
2376              Union
2377                Distinct:
2378                  Union
2379                    TableScan: employee_csv projection=[state, salary]
2380                    TableScan: employee_csv projection=[state, salary]
2381                TableScan: employee_csv projection=[state, salary]
2382            TableScan: employee_csv projection=[state, salary]
2383        ");
2384
2385        Ok(())
2386    }
2387
2388    #[test]
2389    fn plan_builder_simple_distinct() -> Result<()> {
2390        let plan =
2391            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2392                .filter(col("state").eq(lit("CO")))?
2393                .project(vec![col("id")])?
2394                .distinct()?
2395                .build()?;
2396
2397        assert_snapshot!(plan, @r#"
2398        Distinct:
2399          Projection: employee_csv.id
2400            Filter: employee_csv.state = Utf8("CO")
2401              TableScan: employee_csv projection=[id, state]
2402        "#);
2403
2404        Ok(())
2405    }
2406
2407    #[test]
2408    fn exists_subquery() -> Result<()> {
2409        let foo = test_table_scan_with_name("foo")?;
2410        let bar = test_table_scan_with_name("bar")?;
2411
2412        let subquery = LogicalPlanBuilder::from(foo)
2413            .project(vec![col("a")])?
2414            .filter(col("a").eq(col("bar.a")))?
2415            .build()?;
2416
2417        let outer_query = LogicalPlanBuilder::from(bar)
2418            .project(vec![col("a")])?
2419            .filter(exists(Arc::new(subquery)))?
2420            .build()?;
2421
2422        assert_snapshot!(outer_query, @r"
2423        Filter: EXISTS (<subquery>)
2424          Subquery:
2425            Filter: foo.a = bar.a
2426              Projection: foo.a
2427                TableScan: foo
2428          Projection: bar.a
2429            TableScan: bar
2430        ");
2431
2432        Ok(())
2433    }
2434
2435    #[test]
2436    fn filter_in_subquery() -> Result<()> {
2437        let foo = test_table_scan_with_name("foo")?;
2438        let bar = test_table_scan_with_name("bar")?;
2439
2440        let subquery = LogicalPlanBuilder::from(foo)
2441            .project(vec![col("a")])?
2442            .filter(col("a").eq(col("bar.a")))?
2443            .build()?;
2444
2445        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2446        let outer_query = LogicalPlanBuilder::from(bar)
2447            .project(vec![col("a")])?
2448            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2449            .build()?;
2450
2451        assert_snapshot!(outer_query, @r"
2452        Filter: bar.a IN (<subquery>)
2453          Subquery:
2454            Filter: foo.a = bar.a
2455              Projection: foo.a
2456                TableScan: foo
2457          Projection: bar.a
2458            TableScan: bar
2459        ");
2460
2461        Ok(())
2462    }
2463
2464    #[test]
2465    fn select_scalar_subquery() -> Result<()> {
2466        let foo = test_table_scan_with_name("foo")?;
2467        let bar = test_table_scan_with_name("bar")?;
2468
2469        let subquery = LogicalPlanBuilder::from(foo)
2470            .project(vec![col("b")])?
2471            .filter(col("a").eq(col("bar.a")))?
2472            .build()?;
2473
2474        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2475        let outer_query = LogicalPlanBuilder::from(bar)
2476            .project(vec![scalar_subquery(Arc::new(subquery))])?
2477            .build()?;
2478
2479        assert_snapshot!(outer_query, @r"
2480        Projection: (<subquery>)
2481          Subquery:
2482            Filter: foo.a = bar.a
2483              Projection: foo.b
2484                TableScan: foo
2485          TableScan: bar
2486        ");
2487
2488        Ok(())
2489    }
2490
2491    #[test]
2492    fn projection_non_unique_names() -> Result<()> {
2493        let plan = table_scan(
2494            Some("employee_csv"),
2495            &employee_schema(),
2496            // project id and first_name by column index
2497            Some(vec![0, 1]),
2498        )?
2499        // two columns with the same name => error
2500        .project(vec![col("id"), col("first_name").alias("id")]);
2501
2502        match plan {
2503            Err(DataFusionError::SchemaError(err, _)) => {
2504                if let SchemaError::AmbiguousReference { field } = *err {
2505                    let Column {
2506                        relation,
2507                        name,
2508                        spans: _,
2509                    } = *field;
2510                    let Some(TableReference::Bare { table }) = relation else {
2511                        return plan_err!(
2512                            "wrong relation: {relation:?}, expected table name"
2513                        );
2514                    };
2515                    assert_eq!(*"employee_csv", *table);
2516                    assert_eq!("id", &name);
2517                    Ok(())
2518                } else {
2519                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2520                }
2521            }
2522            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2523        }
2524    }
2525
2526    fn employee_schema() -> Schema {
2527        Schema::new(vec![
2528            Field::new("id", DataType::Int32, false),
2529            Field::new("first_name", DataType::Utf8, false),
2530            Field::new("last_name", DataType::Utf8, false),
2531            Field::new("state", DataType::Utf8, false),
2532            Field::new("salary", DataType::Int32, false),
2533        ])
2534    }
2535
2536    #[test]
2537    fn stringified_plan() {
2538        let stringified_plan =
2539            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2540        assert!(stringified_plan.should_display(true));
2541        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2542
2543        let stringified_plan =
2544            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2545        assert!(stringified_plan.should_display(true));
2546        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2547
2548        let stringified_plan =
2549            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2550        assert!(stringified_plan.should_display(true));
2551        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2552
2553        let stringified_plan =
2554            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2555        assert!(stringified_plan.should_display(true));
2556        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2557
2558        let stringified_plan = StringifiedPlan::new(
2559            PlanType::OptimizedLogicalPlan {
2560                optimizer_name: "random opt pass".into(),
2561            },
2562            "...the plan...",
2563        );
2564        assert!(stringified_plan.should_display(true));
2565        assert!(!stringified_plan.should_display(false));
2566    }
2567
2568    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2569        let schema = Schema::new(vec![
2570            Field::new("a", DataType::UInt32, false),
2571            Field::new("b", DataType::UInt32, false),
2572            Field::new("c", DataType::UInt32, false),
2573        ]);
2574        table_scan(Some(name), &schema, None)?.build()
2575    }
2576
2577    #[test]
2578    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2579        let plan1 =
2580            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2581        let plan2 =
2582            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2583
2584        let err_msg1 =
2585            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2586                .unwrap_err();
2587
2588        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2589
2590        Ok(())
2591    }
2592
2593    #[test]
2594    fn plan_builder_unnest() -> Result<()> {
2595        // Cannot unnest on a scalar column
2596        let err = nested_table_scan("test_table")?
2597            .unnest_column("scalar")
2598            .unwrap_err();
2599
2600        let DataFusionError::Internal(desc) = err else {
2601            return plan_err!("Plan should have returned an DataFusionError::Internal");
2602        };
2603
2604        let desc = (*desc
2605            .split(DataFusionError::BACK_TRACE_SEP)
2606            .collect::<Vec<&str>>()
2607            .first()
2608            .unwrap_or(&""))
2609        .to_string();
2610
2611        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2612
2613        // Unnesting the strings list.
2614        let plan = nested_table_scan("test_table")?
2615            .unnest_column("strings")?
2616            .build()?;
2617
2618        assert_snapshot!(plan, @r"
2619        Unnest: lists[test_table.strings|depth=1] structs[]
2620          TableScan: test_table
2621        ");
2622
2623        // Check unnested field is a scalar
2624        let field = plan.schema().field_with_name(None, "strings").unwrap();
2625        assert_eq!(&DataType::Utf8, field.data_type());
2626
2627        // Unnesting the singular struct column result into 2 new columns for each subfield
2628        let plan = nested_table_scan("test_table")?
2629            .unnest_column("struct_singular")?
2630            .build()?;
2631
2632        assert_snapshot!(plan, @r"
2633        Unnest: lists[] structs[test_table.struct_singular]
2634          TableScan: test_table
2635        ");
2636
2637        for field_name in &["a", "b"] {
2638            // Check unnested struct field is a scalar
2639            let field = plan
2640                .schema()
2641                .field_with_name(None, &format!("struct_singular.{field_name}"))
2642                .unwrap();
2643            assert_eq!(&DataType::UInt32, field.data_type());
2644        }
2645
2646        // Unnesting multiple fields in separate plans
2647        let plan = nested_table_scan("test_table")?
2648            .unnest_column("strings")?
2649            .unnest_column("structs")?
2650            .unnest_column("struct_singular")?
2651            .build()?;
2652
2653        assert_snapshot!(plan, @r"
2654        Unnest: lists[] structs[test_table.struct_singular]
2655          Unnest: lists[test_table.structs|depth=1] structs[]
2656            Unnest: lists[test_table.strings|depth=1] structs[]
2657              TableScan: test_table
2658        ");
2659
2660        // Check unnested struct list field should be a struct.
2661        let field = plan.schema().field_with_name(None, "structs").unwrap();
2662        assert!(matches!(field.data_type(), DataType::Struct(_)));
2663
2664        // Unnesting multiple fields at the same time, using infer syntax
2665        let cols = vec!["strings", "structs", "struct_singular"]
2666            .into_iter()
2667            .map(|c| c.into())
2668            .collect();
2669
2670        let plan = nested_table_scan("test_table")?
2671            .unnest_columns_with_options(cols, UnnestOptions::default())?
2672            .build()?;
2673
2674        assert_snapshot!(plan, @r"
2675        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2676          TableScan: test_table
2677        ");
2678
2679        // Unnesting missing column should fail.
2680        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2681        assert!(plan.is_err());
2682
2683        // Simultaneously unnesting a list (with different depth) and a struct column
2684        let plan = nested_table_scan("test_table")?
2685            .unnest_columns_with_options(
2686                vec!["stringss".into(), "struct_singular".into()],
2687                UnnestOptions::default()
2688                    .with_recursions(RecursionUnnestOption {
2689                        input_column: "stringss".into(),
2690                        output_column: "stringss_depth_1".into(),
2691                        depth: 1,
2692                    })
2693                    .with_recursions(RecursionUnnestOption {
2694                        input_column: "stringss".into(),
2695                        output_column: "stringss_depth_2".into(),
2696                        depth: 2,
2697                    }),
2698            )?
2699            .build()?;
2700
2701        assert_snapshot!(plan, @r"
2702        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2703          TableScan: test_table
2704        ");
2705
2706        // Check output columns has correct type
2707        let field = plan
2708            .schema()
2709            .field_with_name(None, "stringss_depth_1")
2710            .unwrap();
2711        assert_eq!(
2712            &DataType::new_list(DataType::Utf8, false),
2713            field.data_type()
2714        );
2715        let field = plan
2716            .schema()
2717            .field_with_name(None, "stringss_depth_2")
2718            .unwrap();
2719        assert_eq!(&DataType::Utf8, field.data_type());
2720        // unnesting struct is still correct
2721        for field_name in &["a", "b"] {
2722            let field = plan
2723                .schema()
2724                .field_with_name(None, &format!("struct_singular.{field_name}"))
2725                .unwrap();
2726            assert_eq!(&DataType::UInt32, field.data_type());
2727        }
2728
2729        Ok(())
2730    }
2731
2732    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2733        // Create a schema with a scalar field, a list of strings, a list of structs
2734        // and a singular struct
2735        let struct_field_in_list = Field::new_struct(
2736            "item",
2737            vec![
2738                Field::new("a", DataType::UInt32, false),
2739                Field::new("b", DataType::UInt32, false),
2740            ],
2741            false,
2742        );
2743        let string_field = Field::new_list_field(DataType::Utf8, false);
2744        let strings_field = Field::new_list("item", string_field.clone(), false);
2745        let schema = Schema::new(vec![
2746            Field::new("scalar", DataType::UInt32, false),
2747            Field::new_list("strings", string_field, false),
2748            Field::new_list("structs", struct_field_in_list, false),
2749            Field::new(
2750                "struct_singular",
2751                DataType::Struct(Fields::from(vec![
2752                    Field::new("a", DataType::UInt32, false),
2753                    Field::new("b", DataType::UInt32, false),
2754                ])),
2755                false,
2756            ),
2757            Field::new_list("stringss", strings_field, false),
2758        ]);
2759
2760        table_scan(Some(table_name), &schema, None)
2761    }
2762
2763    #[test]
2764    fn test_union_after_join() -> Result<()> {
2765        let values = vec![vec![lit(1)]];
2766
2767        let left = LogicalPlanBuilder::values(values.clone())?
2768            .alias("left")?
2769            .build()?;
2770        let right = LogicalPlanBuilder::values(values)?
2771            .alias("right")?
2772            .build()?;
2773
2774        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2775
2776        let plan = LogicalPlanBuilder::from(join.clone())
2777            .union(join)?
2778            .build()?;
2779
2780        assert_snapshot!(plan, @r"
2781        Union
2782          Cross Join:
2783            SubqueryAlias: left
2784              Values: (Int32(1))
2785            SubqueryAlias: right
2786              Values: (Int32(1))
2787          Cross Join:
2788            SubqueryAlias: left
2789              Values: (Int32(1))
2790            SubqueryAlias: right
2791              Values: (Int32(1))
2792        ");
2793
2794        Ok(())
2795    }
2796
2797    #[test]
2798    fn plan_builder_from_logical_plan() -> Result<()> {
2799        let plan =
2800            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2801                .sort(vec![
2802                    expr::Sort::new(col("state"), true, true),
2803                    expr::Sort::new(col("salary"), false, false),
2804                ])?
2805                .build()?;
2806
2807        let plan_expected = format!("{plan}");
2808        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2809        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2810
2811        Ok(())
2812    }
2813
2814    #[test]
2815    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2816        let constraints =
2817            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2818        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2819
2820        let plan =
2821            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2822                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2823                .build()?;
2824
2825        assert_snapshot!(plan, @r"
2826        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2827          TableScan: employee_csv projection=[id, state, salary]
2828        ");
2829
2830        Ok(())
2831    }
2832
2833    #[test]
2834    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2835        let constraints =
2836            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2837        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2838
2839        let options =
2840            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2841        let plan =
2842            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2843                .with_options(options)
2844                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2845                .build()?;
2846
2847        assert_snapshot!(plan, @r"
2848        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2849          TableScan: employee_csv projection=[id, state, salary]
2850        ");
2851
2852        Ok(())
2853    }
2854
2855    #[test]
2856    fn test_join_metadata() -> Result<()> {
2857        let left_schema = DFSchema::new_with_metadata(
2858            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2859            HashMap::from([("key".to_string(), "left".to_string())]),
2860        )?;
2861        let right_schema = DFSchema::new_with_metadata(
2862            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2863            HashMap::from([("key".to_string(), "right".to_string())]),
2864        )?;
2865
2866        let join_schema =
2867            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2868        assert_eq!(
2869            join_schema.metadata(),
2870            &HashMap::from([("key".to_string(), "left".to_string())])
2871        );
2872        let join_schema =
2873            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2874        assert_eq!(
2875            join_schema.metadata(),
2876            &HashMap::from([("key".to_string(), "right".to_string())])
2877        );
2878
2879        Ok(())
2880    }
2881
2882    #[test]
2883    fn test_values_metadata() -> Result<()> {
2884        let metadata: HashMap<String, String> =
2885            [("ARROW:extension:metadata".to_string(), "test".to_string())]
2886                .into_iter()
2887                .collect();
2888        let metadata = FieldMetadata::from(metadata);
2889        let values = LogicalPlanBuilder::values(vec![
2890            vec![lit_with_metadata(1, Some(metadata.clone()))],
2891            vec![lit_with_metadata(2, Some(metadata.clone()))],
2892        ])?
2893        .build()?;
2894        assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2895
2896        // Do not allow VALUES with different metadata mixed together
2897        let metadata2: HashMap<String, String> =
2898            [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2899                .into_iter()
2900                .collect();
2901        let metadata2 = FieldMetadata::from(metadata2);
2902        assert!(
2903            LogicalPlanBuilder::values(vec![
2904                vec![lit_with_metadata(1, Some(metadata.clone()))],
2905                vec![lit_with_metadata(2, Some(metadata2.clone()))],
2906            ])
2907            .is_err()
2908        );
2909
2910        Ok(())
2911    }
2912
2913    #[test]
2914    fn test_unique_field_aliases() {
2915        let t1_field_1 = Field::new("a", DataType::Int32, false);
2916        let t2_field_1 = Field::new("a", DataType::Int32, false);
2917        let t2_field_3 = Field::new("a", DataType::Int32, false);
2918        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2919        let t1_field_2 = Field::new("b", DataType::Int32, false);
2920        let t2_field_2 = Field::new("b", DataType::Int32, false);
2921
2922        let fields = vec![
2923            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2924        ];
2925        let fields = Fields::from(fields);
2926
2927        let remove_redundant = unique_field_aliases(&fields);
2928
2929        // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2930        // First occurrence of each field name keeps original name (None), duplicates get
2931        // incremental suffixes (:1, :2, etc.).
2932        // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2933        // conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2934        // as well to `a:1:1`.
2935        assert_eq!(
2936            remove_redundant,
2937            vec![
2938                None,
2939                Some("a:1".to_string()),
2940                None,
2941                Some("b:1".to_string()),
2942                Some("a:2".to_string()),
2943                Some("a:1:1".to_string()),
2944            ]
2945        );
2946    }
2947}