datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30    coerce_plan_expr_for_schema, normalize_col,
31    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32    rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38    Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44    group_window_expr_by_sort_keys,
45};
46use crate::{
47    and, binary_expr, lit, DmlStatement, ExplainOption, Expr, ExprSchemable, Operator,
48    RecursiveQuery, Statement, TableProviderFilterPushDown, TableSource, WriteOp,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58    exec_err, get_target_functional_dependencies, internal_datafusion_err, not_impl_err,
59    plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
60    NullEquality, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
61};
62use datafusion_expr_common::type_coercion::binary::type_union_resolution;
63
64use indexmap::IndexSet;
65
66/// Default table name for unnamed table
67pub const UNNAMED_TABLE: &str = "?table?";
68
69/// Options for [`LogicalPlanBuilder`]
70#[derive(Default, Debug, Clone)]
71pub struct LogicalPlanBuilderOptions {
72    /// Flag indicating whether the plan builder should add
73    /// functionally dependent expressions as additional aggregation groupings.
74    add_implicit_group_by_exprs: bool,
75}
76
77impl LogicalPlanBuilderOptions {
78    pub fn new() -> Self {
79        Default::default()
80    }
81
82    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
83    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
84        self.add_implicit_group_by_exprs = add;
85        self
86    }
87}
88
89/// Builder for logical plans
90///
91/// # Example building a simple plan
92/// ```
93/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
94/// # use datafusion_common::Result;
95/// # use arrow::datatypes::{Schema, DataType, Field};
96/// #
97/// # fn main() -> Result<()> {
98/// #
99/// # fn employee_schema() -> Schema {
100/// #    Schema::new(vec![
101/// #           Field::new("id", DataType::Int32, false),
102/// #           Field::new("first_name", DataType::Utf8, false),
103/// #           Field::new("last_name", DataType::Utf8, false),
104/// #           Field::new("state", DataType::Utf8, false),
105/// #           Field::new("salary", DataType::Int32, false),
106/// #       ])
107/// #   }
108/// #
109/// // Create a plan similar to
110/// // SELECT last_name
111/// // FROM employees
112/// // WHERE salary < 1000
113/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
114///  // Keep only rows where salary < 1000
115///  .filter(col("salary").lt(lit(1000)))?
116///  // only show "last_name" in the final results
117///  .project(vec![col("last_name")])?
118///  .build()?;
119///
120/// // Convert from plan back to builder
121/// let builder = LogicalPlanBuilder::from(plan);
122///
123/// # Ok(())
124/// # }
125/// ```
126#[derive(Debug, Clone)]
127pub struct LogicalPlanBuilder {
128    plan: Arc<LogicalPlan>,
129    options: LogicalPlanBuilderOptions,
130}
131
132impl LogicalPlanBuilder {
133    /// Create a builder from an existing plan
134    pub fn new(plan: LogicalPlan) -> Self {
135        Self {
136            plan: Arc::new(plan),
137            options: LogicalPlanBuilderOptions::default(),
138        }
139    }
140
141    /// Create a builder from an existing plan
142    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
143        Self {
144            plan,
145            options: LogicalPlanBuilderOptions::default(),
146        }
147    }
148
149    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
150        self.options = options;
151        self
152    }
153
154    /// Return the output schema of the plan build so far
155    pub fn schema(&self) -> &DFSchemaRef {
156        self.plan.schema()
157    }
158
159    /// Return the LogicalPlan of the plan build so far
160    pub fn plan(&self) -> &LogicalPlan {
161        &self.plan
162    }
163
164    /// Create an empty relation.
165    ///
166    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
167    pub fn empty(produce_one_row: bool) -> Self {
168        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
169            produce_one_row,
170            schema: DFSchemaRef::new(DFSchema::empty()),
171        }))
172    }
173
174    /// Convert a regular plan into a recursive query.
175    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
176    pub fn to_recursive_query(
177        self,
178        name: String,
179        recursive_term: LogicalPlan,
180        is_distinct: bool,
181    ) -> Result<Self> {
182        // TODO: we need to do a bunch of validation here. Maybe more.
183        if is_distinct {
184            return not_impl_err!(
185                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
186            );
187        }
188        // Ensure that the static term and the recursive term have the same number of fields
189        let static_fields_len = self.plan.schema().fields().len();
190        let recursive_fields_len = recursive_term.schema().fields().len();
191        if static_fields_len != recursive_fields_len {
192            return plan_err!(
193                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
194                static_fields_len, recursive_fields_len
195            );
196        }
197        // Ensure that the recursive term has the same field types as the static term
198        let coerced_recursive_term =
199            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
200        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
201            name,
202            static_term: self.plan,
203            recursive_term: Arc::new(coerced_recursive_term),
204            is_distinct,
205        })))
206    }
207
208    /// Create a values list based relation, and the schema is inferred from data, consuming
209    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
210    /// documentation for more details.
211    ///
212    /// so it's usually better to override the default names with a table alias list.
213    ///
214    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
215    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
216        if values.is_empty() {
217            return plan_err!("Values list cannot be empty");
218        }
219        let n_cols = values[0].len();
220        if n_cols == 0 {
221            return plan_err!("Values list cannot be zero length");
222        }
223        for (i, row) in values.iter().enumerate() {
224            if row.len() != n_cols {
225                return plan_err!(
226                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
227                    row.len(),
228                    i,
229                    n_cols
230                );
231            }
232        }
233
234        // Infer from data itself
235        Self::infer_data(values)
236    }
237
238    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
239    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
240    /// documentation for more details.
241    ///
242    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
243    /// The column names are not specified by the SQL standard and different database systems do it differently,
244    /// so it's usually better to override the default names with a table alias list.
245    ///
246    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
247    pub fn values_with_schema(
248        values: Vec<Vec<Expr>>,
249        schema: &DFSchemaRef,
250    ) -> Result<Self> {
251        if values.is_empty() {
252            return plan_err!("Values list cannot be empty");
253        }
254        let n_cols = schema.fields().len();
255        if n_cols == 0 {
256            return plan_err!("Values list cannot be zero length");
257        }
258        for (i, row) in values.iter().enumerate() {
259            if row.len() != n_cols {
260                return plan_err!(
261                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
262                    row.len(),
263                    i,
264                    n_cols
265                );
266            }
267        }
268
269        // Check the type of value against the schema
270        Self::infer_values_from_schema(values, schema)
271    }
272
273    fn infer_values_from_schema(
274        values: Vec<Vec<Expr>>,
275        schema: &DFSchema,
276    ) -> Result<Self> {
277        let n_cols = values[0].len();
278        let mut fields = ValuesFields::new();
279        for j in 0..n_cols {
280            let field_type = schema.field(j).data_type();
281            let field_nullable = schema.field(j).is_nullable();
282            for row in values.iter() {
283                let value = &row[j];
284                let data_type = value.get_type(schema)?;
285
286                if !data_type.equals_datatype(field_type)
287                    && !can_cast_types(&data_type, field_type)
288                {
289                    return exec_err!(
290                        "type mismatch and can't cast to got {} and {}",
291                        data_type,
292                        field_type
293                    );
294                }
295            }
296            fields.push(field_type.to_owned(), field_nullable);
297        }
298
299        Self::infer_inner(values, fields, schema)
300    }
301
302    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303        let n_cols = values[0].len();
304        let schema = DFSchema::empty();
305        let mut fields = ValuesFields::new();
306
307        for j in 0..n_cols {
308            let mut common_type: Option<DataType> = None;
309            let mut common_metadata: Option<FieldMetadata> = None;
310            for (i, row) in values.iter().enumerate() {
311                let value = &row[j];
312                let metadata = value.metadata(&schema)?;
313                if let Some(ref cm) = common_metadata {
314                    if &metadata != cm {
315                        return plan_err!("Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}", cm, metadata);
316                    }
317                } else {
318                    common_metadata = Some(metadata.clone());
319                }
320                let data_type = value.get_type(&schema)?;
321                if data_type == DataType::Null {
322                    continue;
323                }
324
325                if let Some(prev_type) = common_type {
326                    // get common type of each column values.
327                    let data_types = vec![prev_type.clone(), data_type.clone()];
328                    let Some(new_type) = type_union_resolution(&data_types) else {
329                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
330                    };
331                    common_type = Some(new_type);
332                } else {
333                    common_type = Some(data_type);
334                }
335            }
336            // assuming common_type was not set, and no error, therefore the type should be NULL
337            // since the code loop skips NULL
338            fields.push_with_metadata(
339                common_type.unwrap_or(DataType::Null),
340                true,
341                common_metadata,
342            );
343        }
344
345        Self::infer_inner(values, fields, &schema)
346    }
347
348    fn infer_inner(
349        mut values: Vec<Vec<Expr>>,
350        fields: ValuesFields,
351        schema: &DFSchema,
352    ) -> Result<Self> {
353        let fields = fields.into_fields();
354        // wrap cast if data type is not same as common type.
355        for row in &mut values {
356            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
357                if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
358                    row[j] = Expr::Literal(
359                        ScalarValue::try_from(field_type)?,
360                        metadata.clone(),
361                    );
362                } else {
363                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
364                }
365            }
366        }
367
368        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
369        let schema = DFSchemaRef::new(dfschema);
370
371        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
372    }
373
374    /// Convert a table provider into a builder with a TableScan
375    ///
376    /// Note that if you pass a string as `table_name`, it is treated
377    /// as a SQL identifier, as described on [`TableReference`] and
378    /// thus is normalized
379    ///
380    /// # Example:
381    /// ```
382    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
383    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
384    /// # };
385    /// # use std::sync::Arc;
386    /// # use arrow::datatypes::{Schema, DataType, Field};
387    /// # use datafusion_common::TableReference;
388    /// #
389    /// # let employee_schema = Arc::new(Schema::new(vec![
390    /// #           Field::new("id", DataType::Int32, false),
391    /// # ])) as _;
392    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
393    /// // Scan table_source with the name "mytable" (after normalization)
394    /// # let table = table_source.clone();
395    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
396    ///
397    /// // Scan table_source with the name "MyTable" by enclosing in quotes
398    /// # let table = table_source.clone();
399    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
400    ///
401    /// // Scan table_source with the name "MyTable" by forming the table reference
402    /// # let table = table_source.clone();
403    /// let table_reference = TableReference::bare("MyTable");
404    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
405    /// ```
406    pub fn scan(
407        table_name: impl Into<TableReference>,
408        table_source: Arc<dyn TableSource>,
409        projection: Option<Vec<usize>>,
410    ) -> Result<Self> {
411        Self::scan_with_filters(table_name, table_source, projection, vec![])
412    }
413
414    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
415    pub fn copy_to(
416        input: LogicalPlan,
417        output_url: String,
418        file_type: Arc<dyn FileType>,
419        options: HashMap<String, String>,
420        partition_by: Vec<String>,
421    ) -> Result<Self> {
422        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
423            Arc::new(input),
424            output_url,
425            partition_by,
426            file_type,
427            options,
428        ))))
429    }
430
431    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
432    ///
433    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
434    ///
435    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
436    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
437    ///
438    /// # Example:
439    /// ```
440    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
441    /// #  logical_plan::builder::LogicalTableSource,
442    /// # };
443    /// # use std::sync::Arc;
444    /// # use arrow::datatypes::{Schema, DataType, Field};
445    /// # use datafusion_expr::dml::InsertOp;
446    /// #
447    /// # fn test() -> datafusion_common::Result<()> {
448    /// # let employee_schema = Arc::new(Schema::new(vec![
449    /// #     Field::new("id", DataType::Int32, false),
450    /// # ])) as _;
451    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
452    /// // VALUES (1), (2)
453    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?.build()?;
454    /// // INSERT INTO MyTable VALUES (1), (2)
455    /// let insert_plan = LogicalPlanBuilder::insert_into(
456    ///     input,
457    ///     "MyTable",
458    ///     table_source,
459    ///     InsertOp::Append,
460    /// )?;
461    /// # Ok(())
462    /// # }
463    /// ```
464    pub fn insert_into(
465        input: LogicalPlan,
466        table_name: impl Into<TableReference>,
467        target: Arc<dyn TableSource>,
468        insert_op: InsertOp,
469    ) -> Result<Self> {
470        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
471            table_name.into(),
472            target,
473            WriteOp::Insert(insert_op),
474            Arc::new(input),
475        ))))
476    }
477
478    /// Convert a table provider into a builder with a TableScan
479    pub fn scan_with_filters(
480        table_name: impl Into<TableReference>,
481        table_source: Arc<dyn TableSource>,
482        projection: Option<Vec<usize>>,
483        filters: Vec<Expr>,
484    ) -> Result<Self> {
485        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
486    }
487
488    /// Convert a table provider into a builder with a TableScan with filter and fetch
489    pub fn scan_with_filters_fetch(
490        table_name: impl Into<TableReference>,
491        table_source: Arc<dyn TableSource>,
492        projection: Option<Vec<usize>>,
493        filters: Vec<Expr>,
494        fetch: Option<usize>,
495    ) -> Result<Self> {
496        Self::scan_with_filters_inner(
497            table_name,
498            table_source,
499            projection,
500            filters,
501            fetch,
502        )
503    }
504
505    fn scan_with_filters_inner(
506        table_name: impl Into<TableReference>,
507        table_source: Arc<dyn TableSource>,
508        projection: Option<Vec<usize>>,
509        filters: Vec<Expr>,
510        fetch: Option<usize>,
511    ) -> Result<Self> {
512        let table_scan =
513            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
514
515        // Inline TableScan
516        if table_scan.filters.is_empty() {
517            if let Some(p) = table_scan.source.get_logical_plan() {
518                let sub_plan = p.into_owned();
519
520                if let Some(proj) = table_scan.projection {
521                    let projection_exprs = proj
522                        .into_iter()
523                        .map(|i| {
524                            Expr::Column(Column::from(
525                                sub_plan.schema().qualified_field(i),
526                            ))
527                        })
528                        .collect::<Vec<_>>();
529                    return Self::new(sub_plan)
530                        .project(projection_exprs)?
531                        .alias(table_scan.table_name);
532                }
533
534                // Ensures that the reference to the inlined table remains the
535                // same, meaning we don't have to change any of the parent nodes
536                // that reference this table.
537                return Self::new(sub_plan).alias(table_scan.table_name);
538            }
539        }
540
541        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542    }
543
544    /// Wrap a plan in a window
545    pub fn window_plan(
546        input: LogicalPlan,
547        window_exprs: impl IntoIterator<Item = Expr>,
548    ) -> Result<LogicalPlan> {
549        let mut plan = input;
550        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
552        // we compare the sort key themselves and if one window's sort keys are a prefix of another
553        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
554        // The sort_by() implementation here is a stable sort.
555        // Note that by this rule if there's an empty over, it'll be at the top level
556        groups.sort_by(|(key_a, _), (key_b, _)| {
557            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558                let key_ordering = compare_sort_expr(first, second, plan.schema());
559                match key_ordering {
560                    Ordering::Less => {
561                        return Ordering::Less;
562                    }
563                    Ordering::Greater => {
564                        return Ordering::Greater;
565                    }
566                    Ordering::Equal => {}
567                }
568            }
569            key_b.len().cmp(&key_a.len())
570        });
571        for (_, exprs) in groups {
572            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573            // Partition and sorting is done at physical level, see the EnforceDistribution
574            // and EnforceSorting rules.
575            plan = LogicalPlanBuilder::from(plan)
576                .window(window_exprs)?
577                .build()?;
578        }
579        Ok(plan)
580    }
581
582    /// Apply a projection without alias.
583    pub fn project(
584        self,
585        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586    ) -> Result<Self> {
587        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588    }
589
590    /// Apply a projection without alias with optional validation
591    /// (true to validate, false to not validate)
592    pub fn project_with_validation(
593        self,
594        expr: Vec<(impl Into<SelectExpr>, bool)>,
595    ) -> Result<Self> {
596        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597    }
598
599    /// Select the given column indices
600    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601        let exprs: Vec<_> = indices
602            .into_iter()
603            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604            .collect();
605        self.project(exprs)
606    }
607
608    /// Apply a filter
609    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610        let expr = normalize_col(expr.into(), &self.plan)?;
611        Filter::try_new(expr, self.plan)
612            .map(LogicalPlan::Filter)
613            .map(Self::new)
614    }
615
616    /// Apply a filter which is used for a having clause
617    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618        let expr = normalize_col(expr.into(), &self.plan)?;
619        Filter::try_new(expr, self.plan)
620            .map(LogicalPlan::Filter)
621            .map(Self::from)
622    }
623
624    /// Make a builder for a prepare logical plan from the builder's plan
625    pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627            Prepare {
628                name,
629                fields,
630                input: self.plan,
631            },
632        ))))
633    }
634
635    /// Limit the number of rows returned
636    ///
637    /// `skip` - Number of rows to skip before fetch any row.
638    ///
639    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
640    ///          if specified.
641    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642        let skip_expr = if skip == 0 {
643            None
644        } else {
645            Some(lit(skip as i64))
646        };
647        let fetch_expr = fetch.map(|f| lit(f as i64));
648        self.limit_by_expr(skip_expr, fetch_expr)
649    }
650
651    /// Limit the number of rows returned
652    ///
653    /// Similar to `limit` but uses expressions for `skip` and `fetch`
654    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655        Ok(Self::new(LogicalPlan::Limit(Limit {
656            skip: skip.map(Box::new),
657            fetch: fetch.map(Box::new),
658            input: self.plan,
659        })))
660    }
661
662    /// Apply an alias
663    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665    }
666
667    /// Add missing sort columns to all downstream projection
668    ///
669    /// Thus, if you have a LogicalPlan that selects A and B and have
670    /// not requested a sort by C, this code will add C recursively to
671    /// all input projections.
672    ///
673    /// Adding a new column is not correct if there is a `Distinct`
674    /// node, which produces only distinct values of its
675    /// inputs. Adding a new column to its input will result in
676    /// potentially different results than with the original column.
677    ///
678    /// For example, if the input is like:
679    ///
680    /// Distinct(A, B)
681    ///
682    /// If the input looks like
683    ///
684    /// a | b | c
685    /// --+---+---
686    /// 1 | 2 | 3
687    /// 1 | 2 | 4
688    ///
689    /// Distinct (A, B) --> (1,2)
690    ///
691    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
692    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
693    ///
694    /// See <https://github.com/apache/datafusion/issues/5065> for more details
695    fn add_missing_columns(
696        curr_plan: LogicalPlan,
697        missing_cols: &IndexSet<Column>,
698        is_distinct: bool,
699    ) -> Result<LogicalPlan> {
700        match curr_plan {
701            LogicalPlan::Projection(Projection {
702                input,
703                mut expr,
704                schema: _,
705            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706                let mut missing_exprs = missing_cols
707                    .iter()
708                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709                    .collect::<Result<Vec<_>>>()?;
710
711                // Do not let duplicate columns to be added, some of the
712                // missing_cols may be already present but without the new
713                // projected alias.
714                missing_exprs.retain(|e| !expr.contains(e));
715                if is_distinct {
716                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717                }
718                expr.extend(missing_exprs);
719                project(Arc::unwrap_or_clone(input), expr)
720            }
721            _ => {
722                let is_distinct =
723                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724                let new_inputs = curr_plan
725                    .inputs()
726                    .into_iter()
727                    .map(|input_plan| {
728                        Self::add_missing_columns(
729                            (*input_plan).clone(),
730                            missing_cols,
731                            is_distinct,
732                        )
733                    })
734                    .collect::<Result<Vec<_>>>()?;
735                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736            }
737        }
738    }
739
740    fn ambiguous_distinct_check(
741        missing_exprs: &[Expr],
742        missing_cols: &IndexSet<Column>,
743        projection_exprs: &[Expr],
744    ) -> Result<()> {
745        if missing_exprs.is_empty() {
746            return Ok(());
747        }
748
749        // if the missing columns are all only aliases for things in
750        // the existing select list, it is ok
751        //
752        // This handles the special case for
753        // SELECT col as <alias> ORDER BY <alias>
754        //
755        // As described in https://github.com/apache/datafusion/issues/5293
756        let all_aliases = missing_exprs.iter().all(|e| {
757            projection_exprs.iter().any(|proj_expr| {
758                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759                    e == expr.as_ref()
760                } else {
761                    false
762                }
763            })
764        });
765        if all_aliases {
766            return Ok(());
767        }
768
769        let missing_col_names = missing_cols
770            .iter()
771            .map(|col| col.flat_name())
772            .collect::<String>();
773
774        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
775    }
776
777    /// Apply a sort by provided expressions with default direction
778    pub fn sort_by(
779        self,
780        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
781    ) -> Result<Self> {
782        self.sort(
783            expr.into_iter()
784                .map(|e| e.into().sort(true, false))
785                .collect::<Vec<SortExpr>>(),
786        )
787    }
788
789    pub fn sort(
790        self,
791        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
792    ) -> Result<Self> {
793        self.sort_with_limit(sorts, None)
794    }
795
796    /// Apply a sort
797    pub fn sort_with_limit(
798        self,
799        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
800        fetch: Option<usize>,
801    ) -> Result<Self> {
802        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
803
804        let schema = self.plan.schema();
805
806        // Collect sort columns that are missing in the input plan's schema
807        let mut missing_cols: IndexSet<Column> = IndexSet::new();
808        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
809            let columns = sort.expr.column_refs();
810
811            missing_cols.extend(
812                columns
813                    .into_iter()
814                    .filter(|c| !schema.has_column(c))
815                    .cloned(),
816            );
817
818            Ok(())
819        })?;
820
821        if missing_cols.is_empty() {
822            return Ok(Self::new(LogicalPlan::Sort(Sort {
823                expr: normalize_sorts(sorts, &self.plan)?,
824                input: self.plan,
825                fetch,
826            })));
827        }
828
829        // remove pushed down sort columns
830        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
831
832        let is_distinct = false;
833        let plan = Self::add_missing_columns(
834            Arc::unwrap_or_clone(self.plan),
835            &missing_cols,
836            is_distinct,
837        )?;
838
839        let sort_plan = LogicalPlan::Sort(Sort {
840            expr: normalize_sorts(sorts, &plan)?,
841            input: Arc::new(plan),
842            fetch,
843        });
844
845        Projection::try_new(new_expr, Arc::new(sort_plan))
846            .map(LogicalPlan::Projection)
847            .map(Self::new)
848    }
849
850    /// Apply a union, preserving duplicate rows
851    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
852        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
853    }
854
855    /// Apply a union by name, preserving duplicate rows
856    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
857        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
858    }
859
860    /// Apply a union by name, removing duplicate rows
861    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
862        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
863        let right_plan: LogicalPlan = plan;
864
865        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
866            union_by_name(left_plan, right_plan)?,
867        )))))
868    }
869
870    /// Apply a union, removing duplicate rows
871    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
872        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
873        let right_plan: LogicalPlan = plan;
874
875        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
876            union(left_plan, right_plan)?,
877        )))))
878    }
879
880    /// Apply deduplication: Only distinct (different) values are returned)
881    pub fn distinct(self) -> Result<Self> {
882        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
883    }
884
885    /// Project first values of the specified expression list according to the provided
886    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
887    pub fn distinct_on(
888        self,
889        on_expr: Vec<Expr>,
890        select_expr: Vec<Expr>,
891        sort_expr: Option<Vec<SortExpr>>,
892    ) -> Result<Self> {
893        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
894            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
895        ))))
896    }
897
898    /// Apply a join to `right` using explicitly specified columns and an
899    /// optional filter expression.
900    ///
901    /// See [`join_on`](Self::join_on) for a more concise way to specify the
902    /// join condition. Since DataFusion will automatically identify and
903    /// optimize equality predicates there is no performance difference between
904    /// this function and `join_on`
905    ///
906    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
907    /// example below), which are then combined with the optional `filter`
908    /// expression.
909    ///
910    /// Note that in case of outer join, the `filter` is applied to only matched rows.
911    pub fn join(
912        self,
913        right: LogicalPlan,
914        join_type: JoinType,
915        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
916        filter: Option<Expr>,
917    ) -> Result<Self> {
918        self.join_detailed(
919            right,
920            join_type,
921            join_keys,
922            filter,
923            NullEquality::NullEqualsNothing,
924        )
925    }
926
927    /// Apply a join using the specified expressions.
928    ///
929    /// Note that DataFusion automatically optimizes joins, including
930    /// identifying and optimizing equality predicates.
931    ///
932    /// # Example
933    ///
934    /// ```
935    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
936    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
937    /// # use std::sync::Arc;
938    /// # use arrow::datatypes::{Schema, DataType, Field};
939    /// # use datafusion_common::Result;
940    /// # fn main() -> Result<()> {
941    /// let example_schema = Arc::new(Schema::new(vec![
942    ///     Field::new("a", DataType::Int32, false),
943    ///     Field::new("b", DataType::Int32, false),
944    ///     Field::new("c", DataType::Int32, false),
945    /// ]));
946    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
947    /// let left_table = table_source.clone();
948    /// let right_table = table_source.clone();
949    ///
950    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
951    ///
952    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
953    /// let exprs = vec![
954    ///     col("left.a").eq(col("right.a")),
955    ///     col("left.b").not_eq(col("right.b")),
956    /// ];
957    ///
958    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
959    /// // finding all pairs of rows from `left` and `right` where
960    /// // where `a = a2` and `b != b2`.
961    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
962    ///     .join_on(right_plan, JoinType::Inner, exprs)?
963    ///     .build()?;
964    /// # Ok(())
965    /// # }
966    /// ```
967    pub fn join_on(
968        self,
969        right: LogicalPlan,
970        join_type: JoinType,
971        on_exprs: impl IntoIterator<Item = Expr>,
972    ) -> Result<Self> {
973        let filter = on_exprs.into_iter().reduce(Expr::and);
974
975        self.join_detailed(
976            right,
977            join_type,
978            (Vec::<Column>::new(), Vec::<Column>::new()),
979            filter,
980            NullEquality::NullEqualsNothing,
981        )
982    }
983
984    pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
985        if column.relation.is_some() {
986            // column is already normalized
987            return Ok(column);
988        }
989
990        let schema = plan.schema();
991        let fallback_schemas = plan.fallback_normalize_schemas();
992        let using_columns = plan.using_columns()?;
993        column.normalize_with_schemas_and_ambiguity_check(
994            &[&[schema], &fallback_schemas],
995            &using_columns,
996        )
997    }
998
999    /// Apply a join with on constraint and specified null equality.
1000    ///
1001    /// The behavior is the same as [`join`](Self::join) except that it allows
1002    /// specifying the null equality behavior.
1003    ///
1004    /// The `null_equality` dictates how `null` values are joined.
1005    pub fn join_detailed(
1006        self,
1007        right: LogicalPlan,
1008        join_type: JoinType,
1009        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1010        filter: Option<Expr>,
1011        null_equality: NullEquality,
1012    ) -> Result<Self> {
1013        if join_keys.0.len() != join_keys.1.len() {
1014            return plan_err!("left_keys and right_keys were not the same length");
1015        }
1016
1017        let filter = if let Some(expr) = filter {
1018            let filter = normalize_col_with_schemas_and_ambiguity_check(
1019                expr,
1020                &[&[self.schema(), right.schema()]],
1021                &[],
1022            )?;
1023            Some(filter)
1024        } else {
1025            None
1026        };
1027
1028        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1029            join_keys
1030                .0
1031                .into_iter()
1032                .zip(join_keys.1)
1033                .map(|(l, r)| {
1034                    let l = l.into();
1035                    let r = r.into();
1036
1037                    match (&l.relation, &r.relation) {
1038                        (Some(lr), Some(rr)) => {
1039                            let l_is_left =
1040                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1041                            let l_is_right =
1042                                right.schema().field_with_qualified_name(lr, &l.name);
1043                            let r_is_left =
1044                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1045                            let r_is_right =
1046                                right.schema().field_with_qualified_name(rr, &r.name);
1047
1048                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1049                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1050                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1051                                _ => (
1052                                    Self::normalize(&self.plan, l),
1053                                    Self::normalize(&right, r),
1054                                ),
1055                            }
1056                        }
1057                        (Some(lr), None) => {
1058                            let l_is_left =
1059                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1060                            let l_is_right =
1061                                right.schema().field_with_qualified_name(lr, &l.name);
1062
1063                            match (l_is_left, l_is_right) {
1064                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1065                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1066                                _ => (
1067                                    Self::normalize(&self.plan, l),
1068                                    Self::normalize(&right, r),
1069                                ),
1070                            }
1071                        }
1072                        (None, Some(rr)) => {
1073                            let r_is_left =
1074                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1075                            let r_is_right =
1076                                right.schema().field_with_qualified_name(rr, &r.name);
1077
1078                            match (r_is_left, r_is_right) {
1079                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1080                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1081                                _ => (
1082                                    Self::normalize(&self.plan, l),
1083                                    Self::normalize(&right, r),
1084                                ),
1085                            }
1086                        }
1087                        (None, None) => {
1088                            let mut swap = false;
1089                            let left_key = Self::normalize(&self.plan, l.clone())
1090                                .or_else(|_| {
1091                                    swap = true;
1092                                    Self::normalize(&right, l)
1093                                });
1094                            if swap {
1095                                (Self::normalize(&self.plan, r), left_key)
1096                            } else {
1097                                (left_key, Self::normalize(&right, r))
1098                            }
1099                        }
1100                    }
1101                })
1102                .unzip();
1103
1104        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1105        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1106
1107        let on: Vec<_> = left_keys
1108            .into_iter()
1109            .zip(right_keys)
1110            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1111            .collect();
1112        let join_schema =
1113            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1114
1115        // Inner type without join condition is cross join
1116        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1117            return plan_err!("join condition should not be empty");
1118        }
1119
1120        Ok(Self::new(LogicalPlan::Join(Join {
1121            left: self.plan,
1122            right: Arc::new(right),
1123            on,
1124            filter,
1125            join_type,
1126            join_constraint: JoinConstraint::On,
1127            schema: DFSchemaRef::new(join_schema),
1128            null_equality,
1129        })))
1130    }
1131
1132    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1133    pub fn join_using(
1134        self,
1135        right: LogicalPlan,
1136        join_type: JoinType,
1137        using_keys: Vec<Column>,
1138    ) -> Result<Self> {
1139        let left_keys: Vec<Column> = using_keys
1140            .clone()
1141            .into_iter()
1142            .map(|c| Self::normalize(&self.plan, c))
1143            .collect::<Result<_>>()?;
1144        let right_keys: Vec<Column> = using_keys
1145            .into_iter()
1146            .map(|c| Self::normalize(&right, c))
1147            .collect::<Result<_>>()?;
1148
1149        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1150        let mut join_on: Vec<(Expr, Expr)> = vec![];
1151        let mut filters: Option<Expr> = None;
1152        for (l, r) in &on {
1153            if self.plan.schema().has_column(l)
1154                && right.schema().has_column(r)
1155                && can_hash(
1156                    datafusion_common::ExprSchema::field_from_column(
1157                        self.plan.schema(),
1158                        l,
1159                    )?
1160                    .data_type(),
1161                )
1162            {
1163                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1164            } else if self.plan.schema().has_column(l)
1165                && right.schema().has_column(r)
1166                && can_hash(
1167                    datafusion_common::ExprSchema::field_from_column(
1168                        self.plan.schema(),
1169                        r,
1170                    )?
1171                    .data_type(),
1172                )
1173            {
1174                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1175            } else {
1176                let expr = binary_expr(
1177                    Expr::Column(l.clone()),
1178                    Operator::Eq,
1179                    Expr::Column(r.clone()),
1180                );
1181                match filters {
1182                    None => filters = Some(expr),
1183                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1184                }
1185            }
1186        }
1187
1188        if join_on.is_empty() {
1189            let join = Self::from(self.plan).cross_join(right)?;
1190            join.filter(filters.ok_or_else(|| {
1191                internal_datafusion_err!("filters should not be None here")
1192            })?)
1193        } else {
1194            let join = Join::try_new(
1195                self.plan,
1196                Arc::new(right),
1197                join_on,
1198                filters,
1199                join_type,
1200                JoinConstraint::Using,
1201                NullEquality::NullEqualsNothing,
1202            )?;
1203
1204            Ok(Self::new(LogicalPlan::Join(join)))
1205        }
1206    }
1207
1208    /// Apply a cross join
1209    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1210        let join = Join::try_new(
1211            self.plan,
1212            Arc::new(right),
1213            vec![],
1214            None,
1215            JoinType::Inner,
1216            JoinConstraint::On,
1217            NullEquality::NullEqualsNothing,
1218        )?;
1219
1220        Ok(Self::new(LogicalPlan::Join(join)))
1221    }
1222
1223    /// Repartition
1224    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1225        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1226            input: self.plan,
1227            partitioning_scheme,
1228        })))
1229    }
1230
1231    /// Apply a window functions to extend the schema
1232    pub fn window(
1233        self,
1234        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1235    ) -> Result<Self> {
1236        let window_expr = normalize_cols(window_expr, &self.plan)?;
1237        validate_unique_names("Windows", &window_expr)?;
1238        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1239            window_expr,
1240            self.plan,
1241        )?)))
1242    }
1243
1244    /// Apply an aggregate: grouping on the `group_expr` expressions
1245    /// and calculating `aggr_expr` aggregates for each distinct
1246    /// value of the `group_expr`;
1247    pub fn aggregate(
1248        self,
1249        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1250        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1251    ) -> Result<Self> {
1252        let group_expr = normalize_cols(group_expr, &self.plan)?;
1253        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1254
1255        let group_expr = if self.options.add_implicit_group_by_exprs {
1256            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1257        } else {
1258            group_expr
1259        };
1260
1261        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1262            .map(LogicalPlan::Aggregate)
1263            .map(Self::new)
1264    }
1265
1266    /// Create an expression to represent the explanation of the plan
1267    ///
1268    /// if `analyze` is true, runs the actual plan and produces
1269    /// information about metrics during run.
1270    ///
1271    /// if `verbose` is true, prints out additional details.
1272    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1273        // Keep the format default to Indent
1274        self.explain_option_format(
1275            ExplainOption::default()
1276                .with_verbose(verbose)
1277                .with_analyze(analyze),
1278        )
1279    }
1280
1281    /// Create an expression to represent the explanation of the plan
1282    /// The`explain_option` is used to specify the format and verbosity of the explanation.
1283    /// Details see [`ExplainOption`].
1284    pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1285        let schema = LogicalPlan::explain_schema();
1286        let schema = schema.to_dfschema_ref()?;
1287
1288        if explain_option.analyze {
1289            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1290                verbose: explain_option.verbose,
1291                input: self.plan,
1292                schema,
1293            })))
1294        } else {
1295            let stringified_plans =
1296                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1297
1298            Ok(Self::new(LogicalPlan::Explain(Explain {
1299                verbose: explain_option.verbose,
1300                plan: self.plan,
1301                explain_format: explain_option.format,
1302                stringified_plans,
1303                schema,
1304                logical_optimization_succeeded: false,
1305            })))
1306        }
1307    }
1308
1309    /// Process intersect set operator
1310    pub fn intersect(
1311        left_plan: LogicalPlan,
1312        right_plan: LogicalPlan,
1313        is_all: bool,
1314    ) -> Result<LogicalPlan> {
1315        LogicalPlanBuilder::intersect_or_except(
1316            left_plan,
1317            right_plan,
1318            JoinType::LeftSemi,
1319            is_all,
1320        )
1321    }
1322
1323    /// Process except set operator
1324    pub fn except(
1325        left_plan: LogicalPlan,
1326        right_plan: LogicalPlan,
1327        is_all: bool,
1328    ) -> Result<LogicalPlan> {
1329        LogicalPlanBuilder::intersect_or_except(
1330            left_plan,
1331            right_plan,
1332            JoinType::LeftAnti,
1333            is_all,
1334        )
1335    }
1336
1337    /// Process intersect or except
1338    fn intersect_or_except(
1339        left_plan: LogicalPlan,
1340        right_plan: LogicalPlan,
1341        join_type: JoinType,
1342        is_all: bool,
1343    ) -> Result<LogicalPlan> {
1344        let left_len = left_plan.schema().fields().len();
1345        let right_len = right_plan.schema().fields().len();
1346
1347        if left_len != right_len {
1348            return plan_err!(
1349                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1350            );
1351        }
1352
1353        let join_keys = left_plan
1354            .schema()
1355            .fields()
1356            .iter()
1357            .zip(right_plan.schema().fields().iter())
1358            .map(|(left_field, right_field)| {
1359                (
1360                    (Column::from_name(left_field.name())),
1361                    (Column::from_name(right_field.name())),
1362                )
1363            })
1364            .unzip();
1365        if is_all {
1366            LogicalPlanBuilder::from(left_plan)
1367                .join_detailed(
1368                    right_plan,
1369                    join_type,
1370                    join_keys,
1371                    None,
1372                    NullEquality::NullEqualsNull,
1373                )?
1374                .build()
1375        } else {
1376            LogicalPlanBuilder::from(left_plan)
1377                .distinct()?
1378                .join_detailed(
1379                    right_plan,
1380                    join_type,
1381                    join_keys,
1382                    None,
1383                    NullEquality::NullEqualsNull,
1384                )?
1385                .build()
1386        }
1387    }
1388
1389    /// Build the plan
1390    pub fn build(self) -> Result<LogicalPlan> {
1391        Ok(Arc::unwrap_or_clone(self.plan))
1392    }
1393
1394    /// Apply a join with both explicit equijoin and non equijoin predicates.
1395    ///
1396    /// Note this is a low level API that requires identifying specific
1397    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1398    /// automatically identifies predicates appropriately.
1399    ///
1400    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1401    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1402    /// to columns from the existing input. `r`, the second element of the tuple,
1403    /// must only refer to columns from the right input.
1404    ///
1405    /// `filter` contains any other filter expression to apply during the
1406    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1407    /// than the filter expressions, so they are preferred.
1408    pub fn join_with_expr_keys(
1409        self,
1410        right: LogicalPlan,
1411        join_type: JoinType,
1412        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1413        filter: Option<Expr>,
1414    ) -> Result<Self> {
1415        if equi_exprs.0.len() != equi_exprs.1.len() {
1416            return plan_err!("left_keys and right_keys were not the same length");
1417        }
1418
1419        let join_key_pairs = equi_exprs
1420            .0
1421            .into_iter()
1422            .zip(equi_exprs.1)
1423            .map(|(l, r)| {
1424                let left_key = l.into();
1425                let right_key = r.into();
1426                let mut left_using_columns  = HashSet::new();
1427                expr_to_columns(&left_key, &mut left_using_columns)?;
1428                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1429                    left_key,
1430                    &[&[self.plan.schema()]],
1431                    &[],
1432                )?;
1433
1434                let mut right_using_columns = HashSet::new();
1435                expr_to_columns(&right_key, &mut right_using_columns)?;
1436                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1437                    right_key,
1438                    &[&[right.schema()]],
1439                    &[],
1440                )?;
1441
1442                // find valid equijoin
1443                find_valid_equijoin_key_pair(
1444                        &normalized_left_key,
1445                        &normalized_right_key,
1446                        self.plan.schema(),
1447                        right.schema(),
1448                    )?.ok_or_else(||
1449                        plan_datafusion_err!(
1450                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1451                        ))
1452            })
1453            .collect::<Result<Vec<_>>>()?;
1454
1455        let join = Join::try_new(
1456            self.plan,
1457            Arc::new(right),
1458            join_key_pairs,
1459            filter,
1460            join_type,
1461            JoinConstraint::On,
1462            NullEquality::NullEqualsNothing,
1463        )?;
1464
1465        Ok(Self::new(LogicalPlan::Join(join)))
1466    }
1467
1468    /// Unnest the given column.
1469    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1470        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1471    }
1472
1473    /// Unnest the given column given [`UnnestOptions`]
1474    pub fn unnest_column_with_options(
1475        self,
1476        column: impl Into<Column>,
1477        options: UnnestOptions,
1478    ) -> Result<Self> {
1479        unnest_with_options(
1480            Arc::unwrap_or_clone(self.plan),
1481            vec![column.into()],
1482            options,
1483        )
1484        .map(Self::new)
1485    }
1486
1487    /// Unnest the given columns with the given [`UnnestOptions`]
1488    pub fn unnest_columns_with_options(
1489        self,
1490        columns: Vec<Column>,
1491        options: UnnestOptions,
1492    ) -> Result<Self> {
1493        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1494            .map(Self::new)
1495    }
1496}
1497
1498impl From<LogicalPlan> for LogicalPlanBuilder {
1499    fn from(plan: LogicalPlan) -> Self {
1500        LogicalPlanBuilder::new(plan)
1501    }
1502}
1503
1504impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1505    fn from(plan: Arc<LogicalPlan>) -> Self {
1506        LogicalPlanBuilder::new_from_arc(plan)
1507    }
1508}
1509
1510/// Container used when building fields for a `VALUES` node.
1511#[derive(Default)]
1512struct ValuesFields {
1513    inner: Vec<Field>,
1514}
1515
1516impl ValuesFields {
1517    pub fn new() -> Self {
1518        Self::default()
1519    }
1520
1521    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1522        self.push_with_metadata(data_type, nullable, None);
1523    }
1524
1525    pub fn push_with_metadata(
1526        &mut self,
1527        data_type: DataType,
1528        nullable: bool,
1529        metadata: Option<FieldMetadata>,
1530    ) {
1531        // Naming follows the convention described here:
1532        // https://www.postgresql.org/docs/current/queries-values.html
1533        let name = format!("column{}", self.inner.len() + 1);
1534        let mut field = Field::new(name, data_type, nullable);
1535        if let Some(metadata) = metadata {
1536            field.set_metadata(metadata.to_hashmap());
1537        }
1538        self.inner.push(field);
1539    }
1540
1541    pub fn into_fields(self) -> Fields {
1542        self.inner.into()
1543    }
1544}
1545
1546/// Returns aliases to make field names unique.
1547///
1548/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1549/// `Some(alias)` means rename to the alias to ensure uniqueness.
1550///
1551/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1552/// to maintain unique column names.
1553///
1554/// # Example
1555/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1556/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1557pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1558    // Some field names might already come to this function with the count (number of times it appeared)
1559    // as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1560    // if these three fields passed to this function: "col:1", "col" and "col", the function
1561    // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1562    // That's why we need the `seen` set, so the fields are always unique.
1563
1564    // Tracks a mapping between a field name and the number of appearances of that field.
1565    let mut name_map = HashMap::<&str, usize>::new();
1566    // Tracks all the fields and aliases that were previously seen.
1567    let mut seen = HashSet::<Cow<String>>::new();
1568
1569    fields
1570        .iter()
1571        .map(|field| {
1572            let original_name = field.name();
1573            let mut name = Cow::Borrowed(original_name);
1574
1575            let count = name_map.entry(original_name).or_insert(0);
1576
1577            // Loop until we find a name that hasn't been used.
1578            while seen.contains(&name) {
1579                *count += 1;
1580                name = Cow::Owned(format!("{original_name}:{count}"));
1581            }
1582
1583            seen.insert(name.clone());
1584
1585            match name {
1586                Cow::Borrowed(_) => None,
1587                Cow::Owned(alias) => Some(alias),
1588            }
1589        })
1590        .collect()
1591}
1592
1593fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1594    let mut table_references = schema
1595        .iter()
1596        .filter_map(|(qualifier, _)| qualifier)
1597        .collect::<Vec<_>>();
1598    table_references.dedup();
1599    let table_reference = if table_references.len() == 1 {
1600        table_references.pop().cloned()
1601    } else {
1602        None
1603    };
1604
1605    (
1606        table_reference,
1607        Arc::new(Field::new("mark", DataType::Boolean, false)),
1608    )
1609}
1610
1611/// Creates a schema for a join operation.
1612/// The fields from the left side are first
1613pub fn build_join_schema(
1614    left: &DFSchema,
1615    right: &DFSchema,
1616    join_type: &JoinType,
1617) -> Result<DFSchema> {
1618    fn nullify_fields<'a>(
1619        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1620    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1621        fields
1622            .map(|(q, f)| {
1623                // TODO: find a good way to do that
1624                let field = f.as_ref().clone().with_nullable(true);
1625                (q.cloned(), Arc::new(field))
1626            })
1627            .collect()
1628    }
1629
1630    let right_fields = right.iter();
1631    let left_fields = left.iter();
1632
1633    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1634        JoinType::Inner => {
1635            // left then right
1636            let left_fields = left_fields
1637                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1638                .collect::<Vec<_>>();
1639            let right_fields = right_fields
1640                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1641                .collect::<Vec<_>>();
1642            left_fields.into_iter().chain(right_fields).collect()
1643        }
1644        JoinType::Left => {
1645            // left then right, right set to nullable in case of not matched scenario
1646            let left_fields = left_fields
1647                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1648                .collect::<Vec<_>>();
1649            left_fields
1650                .into_iter()
1651                .chain(nullify_fields(right_fields))
1652                .collect()
1653        }
1654        JoinType::Right => {
1655            // left then right, left set to nullable in case of not matched scenario
1656            let right_fields = right_fields
1657                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1658                .collect::<Vec<_>>();
1659            nullify_fields(left_fields)
1660                .into_iter()
1661                .chain(right_fields)
1662                .collect()
1663        }
1664        JoinType::Full => {
1665            // left then right, all set to nullable in case of not matched scenario
1666            nullify_fields(left_fields)
1667                .into_iter()
1668                .chain(nullify_fields(right_fields))
1669                .collect()
1670        }
1671        JoinType::LeftSemi | JoinType::LeftAnti => {
1672            // Only use the left side for the schema
1673            left_fields
1674                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1675                .collect()
1676        }
1677        JoinType::LeftMark => left_fields
1678            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1679            .chain(once(mark_field(right)))
1680            .collect(),
1681        JoinType::RightSemi | JoinType::RightAnti => {
1682            // Only use the right side for the schema
1683            right_fields
1684                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1685                .collect()
1686        }
1687        JoinType::RightMark => right_fields
1688            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1689            .chain(once(mark_field(left)))
1690            .collect(),
1691    };
1692    let func_dependencies = left.functional_dependencies().join(
1693        right.functional_dependencies(),
1694        join_type,
1695        left.fields().len(),
1696    );
1697
1698    let (schema1, schema2) = match join_type {
1699        JoinType::Right
1700        | JoinType::RightSemi
1701        | JoinType::RightAnti
1702        | JoinType::RightMark => (left, right),
1703        _ => (right, left),
1704    };
1705
1706    let metadata = schema1
1707        .metadata()
1708        .clone()
1709        .into_iter()
1710        .chain(schema2.metadata().clone())
1711        .collect();
1712
1713    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1714    dfschema.with_functional_dependencies(func_dependencies)
1715}
1716
1717/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1718/// conflict with the columns from the other.
1719/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1720/// aliases, neither for columns nor for tables.  DataFusion requires columns to be uniquely identifiable, in some
1721/// places (see e.g. DFSchema::check_names).
1722/// The function returns:
1723/// - The requalified or original left logical plan
1724/// - The requalified or original right logical plan
1725/// - If a requalification was needed or not
1726pub fn requalify_sides_if_needed(
1727    left: LogicalPlanBuilder,
1728    right: LogicalPlanBuilder,
1729) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1730    let left_cols = left.schema().columns();
1731    let right_cols = right.schema().columns();
1732    if left_cols.iter().any(|l| {
1733        right_cols.iter().any(|r| {
1734            l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1735        })
1736    }) {
1737        // These names have no connection to the original plan, but they'll make the columns
1738        // (mostly) unique.
1739        Ok((
1740            left.alias(TableReference::bare("left"))?,
1741            right.alias(TableReference::bare("right"))?,
1742            true,
1743        ))
1744    } else {
1745        Ok((left, right, false))
1746    }
1747}
1748
1749/// Add additional "synthetic" group by expressions based on functional
1750/// dependencies.
1751///
1752/// For example, if we are grouping on `[c1]`, and we know from
1753/// functional dependencies that column `c1` determines `c2`, this function
1754/// adds `c2` to the group by list.
1755///
1756/// This allows MySQL style selects like
1757/// `SELECT col FROM t WHERE pk = 5` if col is unique
1758pub fn add_group_by_exprs_from_dependencies(
1759    mut group_expr: Vec<Expr>,
1760    schema: &DFSchemaRef,
1761) -> Result<Vec<Expr>> {
1762    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1763    // c1 + 1` produces an output field named `"c1 + 1"`
1764    let mut group_by_field_names = group_expr
1765        .iter()
1766        .map(|e| e.schema_name().to_string())
1767        .collect::<Vec<_>>();
1768
1769    if let Some(target_indices) =
1770        get_target_functional_dependencies(schema, &group_by_field_names)
1771    {
1772        for idx in target_indices {
1773            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1774            let expr_name = expr.schema_name().to_string();
1775            if !group_by_field_names.contains(&expr_name) {
1776                group_by_field_names.push(expr_name);
1777                group_expr.push(expr);
1778            }
1779        }
1780    }
1781    Ok(group_expr)
1782}
1783
1784/// Errors if one or more expressions have equal names.
1785pub fn validate_unique_names<'a>(
1786    node_name: &str,
1787    expressions: impl IntoIterator<Item = &'a Expr>,
1788) -> Result<()> {
1789    let mut unique_names = HashMap::new();
1790
1791    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1792        let name = expr.schema_name().to_string();
1793        match unique_names.get(&name) {
1794            None => {
1795                unique_names.insert(name, (position, expr));
1796                Ok(())
1797            },
1798            Some((existing_position, existing_expr)) => {
1799                plan_err!("{node_name} require unique expression names \
1800                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1801                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1802                            )
1803            }
1804        }
1805    })
1806}
1807
1808/// Union two [`LogicalPlan`]s.
1809///
1810/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1811/// subtree expressions will not be properly typed until the optimizer pass.
1812///
1813/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1814/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1815/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1816///
1817/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1818/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1819pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1820    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1821        Arc::new(left_plan),
1822        Arc::new(right_plan),
1823    ])?))
1824}
1825
1826/// Like [`union`], but combine rows from different tables by name, rather than
1827/// by position.
1828pub fn union_by_name(
1829    left_plan: LogicalPlan,
1830    right_plan: LogicalPlan,
1831) -> Result<LogicalPlan> {
1832    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1833        Arc::new(left_plan),
1834        Arc::new(right_plan),
1835    ])?))
1836}
1837
1838/// Create Projection
1839/// # Errors
1840/// This function errors under any of the following conditions:
1841/// * Two or more expressions have the same name
1842/// * An invalid expression is used (e.g. a `sort` expression)
1843pub fn project(
1844    plan: LogicalPlan,
1845    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1846) -> Result<LogicalPlan> {
1847    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1848}
1849
1850/// Create Projection. Similar to project except that the expressions
1851/// passed in have a flag to indicate if that expression requires
1852/// validation (normalize & columnize) (true) or not (false)
1853/// # Errors
1854/// This function errors under any of the following conditions:
1855/// * Two or more expressions have the same name
1856/// * An invalid expression is used (e.g. a `sort` expression)
1857fn project_with_validation(
1858    plan: LogicalPlan,
1859    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1860) -> Result<LogicalPlan> {
1861    let mut projected_expr = vec![];
1862    for (e, validate) in expr {
1863        let e = e.into();
1864        match e {
1865            SelectExpr::Wildcard(opt) => {
1866                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1867
1868                // If there is a REPLACE statement, replace that column with the given
1869                // replace expression. Column name remains the same.
1870                let expanded = if let Some(replace) = opt.replace {
1871                    replace_columns(expanded, &replace)?
1872                } else {
1873                    expanded
1874                };
1875
1876                for e in expanded {
1877                    if validate {
1878                        projected_expr
1879                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1880                    } else {
1881                        projected_expr.push(e)
1882                    }
1883                }
1884            }
1885            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1886                let expanded =
1887                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1888
1889                // If there is a REPLACE statement, replace that column with the given
1890                // replace expression. Column name remains the same.
1891                let expanded = if let Some(replace) = opt.replace {
1892                    replace_columns(expanded, &replace)?
1893                } else {
1894                    expanded
1895                };
1896
1897                for e in expanded {
1898                    if validate {
1899                        projected_expr
1900                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1901                    } else {
1902                        projected_expr.push(e)
1903                    }
1904                }
1905            }
1906            SelectExpr::Expression(e) => {
1907                if validate {
1908                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1909                } else {
1910                    projected_expr.push(e)
1911                }
1912            }
1913        }
1914    }
1915    validate_unique_names("Projections", projected_expr.iter())?;
1916
1917    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1918}
1919
1920/// If there is a REPLACE statement in the projected expression in the form of
1921/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1922/// that column with the given replace expression. Column name remains the same.
1923/// Multiple REPLACEs are also possible with comma separations.
1924fn replace_columns(
1925    mut exprs: Vec<Expr>,
1926    replace: &PlannedReplaceSelectItem,
1927) -> Result<Vec<Expr>> {
1928    for expr in exprs.iter_mut() {
1929        if let Expr::Column(Column { name, .. }) = expr {
1930            if let Some((_, new_expr)) = replace
1931                .items()
1932                .iter()
1933                .zip(replace.expressions().iter())
1934                .find(|(item, _)| item.column_name.value == *name)
1935            {
1936                *expr = new_expr.clone().alias(name.clone())
1937            }
1938        }
1939    }
1940    Ok(exprs)
1941}
1942
1943/// Create a SubqueryAlias to wrap a LogicalPlan.
1944pub fn subquery_alias(
1945    plan: LogicalPlan,
1946    alias: impl Into<TableReference>,
1947) -> Result<LogicalPlan> {
1948    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1949}
1950
1951/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1952/// This is mostly used for testing and documentation.
1953pub fn table_scan(
1954    name: Option<impl Into<TableReference>>,
1955    table_schema: &Schema,
1956    projection: Option<Vec<usize>>,
1957) -> Result<LogicalPlanBuilder> {
1958    table_scan_with_filters(name, table_schema, projection, vec![])
1959}
1960
1961/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1962/// and inlined filters.
1963/// This is mostly used for testing and documentation.
1964pub fn table_scan_with_filters(
1965    name: Option<impl Into<TableReference>>,
1966    table_schema: &Schema,
1967    projection: Option<Vec<usize>>,
1968    filters: Vec<Expr>,
1969) -> Result<LogicalPlanBuilder> {
1970    let table_source = table_source(table_schema);
1971    let name = name
1972        .map(|n| n.into())
1973        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1974    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1975}
1976
1977/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1978/// filters, and inlined fetch.
1979/// This is mostly used for testing and documentation.
1980pub fn table_scan_with_filter_and_fetch(
1981    name: Option<impl Into<TableReference>>,
1982    table_schema: &Schema,
1983    projection: Option<Vec<usize>>,
1984    filters: Vec<Expr>,
1985    fetch: Option<usize>,
1986) -> Result<LogicalPlanBuilder> {
1987    let table_source = table_source(table_schema);
1988    let name = name
1989        .map(|n| n.into())
1990        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1991    LogicalPlanBuilder::scan_with_filters_fetch(
1992        name,
1993        table_source,
1994        projection,
1995        filters,
1996        fetch,
1997    )
1998}
1999
2000pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2001    // TODO should we take SchemaRef and avoid cloning?
2002    let table_schema = Arc::new(table_schema.clone());
2003    Arc::new(LogicalTableSource {
2004        table_schema,
2005        constraints: Default::default(),
2006    })
2007}
2008
2009pub fn table_source_with_constraints(
2010    table_schema: &Schema,
2011    constraints: Constraints,
2012) -> Arc<dyn TableSource> {
2013    // TODO should we take SchemaRef and avoid cloning?
2014    let table_schema = Arc::new(table_schema.clone());
2015    Arc::new(LogicalTableSource {
2016        table_schema,
2017        constraints,
2018    })
2019}
2020
2021/// Wrap projection for a plan, if the join keys contains normal expression.
2022pub fn wrap_projection_for_join_if_necessary(
2023    join_keys: &[Expr],
2024    input: LogicalPlan,
2025) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2026    let input_schema = input.schema();
2027    let alias_join_keys: Vec<Expr> = join_keys
2028        .iter()
2029        .map(|key| {
2030            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
2031            // If we do not add alias, it will throw same field name error in the schema when adding projection.
2032            // For example:
2033            //    input scan : [a, b, c],
2034            //    join keys: [cast(a as int)]
2035            //
2036            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
2037            //  https://github.com/apache/datafusion/issues/4478
2038            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2039                let alias = format!("{key}");
2040                key.clone().alias(alias)
2041            } else {
2042                key.clone()
2043            }
2044        })
2045        .collect::<Vec<_>>();
2046
2047    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2048    let plan = if need_project {
2049        // Include all columns from the input and extend them with the join keys
2050        let mut projection = input_schema
2051            .columns()
2052            .into_iter()
2053            .map(Expr::Column)
2054            .collect::<Vec<_>>();
2055        let join_key_items = alias_join_keys
2056            .iter()
2057            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2058            .cloned()
2059            .collect::<HashSet<Expr>>();
2060        projection.extend(join_key_items);
2061
2062        LogicalPlanBuilder::from(input)
2063            .project(projection.into_iter().map(SelectExpr::from))?
2064            .build()?
2065    } else {
2066        input
2067    };
2068
2069    let join_on = alias_join_keys
2070        .into_iter()
2071        .map(|key| {
2072            if let Some(col) = key.try_as_col() {
2073                Ok(col.clone())
2074            } else {
2075                let name = key.schema_name().to_string();
2076                Ok(Column::from_name(name))
2077            }
2078        })
2079        .collect::<Result<Vec<_>>>()?;
2080
2081    Ok((plan, join_on, need_project))
2082}
2083
2084/// Basic TableSource implementation intended for use in tests and documentation. It is expected
2085/// that users will provide their own TableSource implementations or use DataFusion's
2086/// DefaultTableSource.
2087pub struct LogicalTableSource {
2088    table_schema: SchemaRef,
2089    constraints: Constraints,
2090}
2091
2092impl LogicalTableSource {
2093    /// Create a new LogicalTableSource
2094    pub fn new(table_schema: SchemaRef) -> Self {
2095        Self {
2096            table_schema,
2097            constraints: Constraints::default(),
2098        }
2099    }
2100
2101    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2102        self.constraints = constraints;
2103        self
2104    }
2105}
2106
2107impl TableSource for LogicalTableSource {
2108    fn as_any(&self) -> &dyn Any {
2109        self
2110    }
2111
2112    fn schema(&self) -> SchemaRef {
2113        Arc::clone(&self.table_schema)
2114    }
2115
2116    fn constraints(&self) -> Option<&Constraints> {
2117        Some(&self.constraints)
2118    }
2119
2120    fn supports_filters_pushdown(
2121        &self,
2122        filters: &[&Expr],
2123    ) -> Result<Vec<TableProviderFilterPushDown>> {
2124        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2125    }
2126}
2127
2128/// Create a [`LogicalPlan::Unnest`] plan
2129pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2130    unnest_with_options(input, columns, UnnestOptions::default())
2131}
2132
2133pub fn get_struct_unnested_columns(
2134    col_name: &String,
2135    inner_fields: &Fields,
2136) -> Vec<Column> {
2137    inner_fields
2138        .iter()
2139        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2140        .collect()
2141}
2142
2143/// Create a [`LogicalPlan::Unnest`] plan with options
2144/// This function receive a list of columns to be unnested
2145/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2146/// The new schema will contains post-unnest fields replacing the original field
2147///
2148/// For example:
2149/// Input schema as
2150/// ```text
2151/// +---------------------+-------------------+
2152/// | col1                | col2              |
2153/// +---------------------+-------------------+
2154/// | Struct(INT64,INT32) | List(List(Int64)) |
2155/// +---------------------+-------------------+
2156/// ```
2157///
2158///
2159///
2160/// Then unnesting columns with:
2161/// - (col1,Struct)
2162/// - (col2,List(\[depth=1,depth=2\]))
2163///
2164/// will generate a new schema as
2165/// ```text
2166/// +---------+---------+---------------------+---------------------+
2167/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2168/// +---------+---------+---------------------+---------------------+
2169/// | Int64   | Int32   | List(Int64)         |  Int64              |
2170/// +---------+---------+---------------------+---------------------+
2171/// ```
2172pub fn unnest_with_options(
2173    input: LogicalPlan,
2174    columns_to_unnest: Vec<Column>,
2175    options: UnnestOptions,
2176) -> Result<LogicalPlan> {
2177    Ok(LogicalPlan::Unnest(Unnest::try_new(
2178        Arc::new(input),
2179        columns_to_unnest,
2180        options,
2181    )?))
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186    use std::vec;
2187
2188    use super::*;
2189    use crate::lit_with_metadata;
2190    use crate::logical_plan::StringifiedPlan;
2191    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2192
2193    use crate::test::function_stub::sum;
2194    use datafusion_common::{
2195        Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2196    };
2197    use insta::assert_snapshot;
2198
2199    #[test]
2200    fn plan_builder_simple() -> Result<()> {
2201        let plan =
2202            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2203                .filter(col("state").eq(lit("CO")))?
2204                .project(vec![col("id")])?
2205                .build()?;
2206
2207        assert_snapshot!(plan, @r#"
2208        Projection: employee_csv.id
2209          Filter: employee_csv.state = Utf8("CO")
2210            TableScan: employee_csv projection=[id, state]
2211        "#);
2212
2213        Ok(())
2214    }
2215
2216    #[test]
2217    fn plan_builder_schema() {
2218        let schema = employee_schema();
2219        let projection = None;
2220        let plan =
2221            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2222                .unwrap();
2223        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2224
2225        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2226        // (and thus normalized to "employee"csv") as well
2227        let projection = None;
2228        let plan =
2229            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2230                .unwrap();
2231        assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2232    }
2233
2234    #[test]
2235    fn plan_builder_empty_name() {
2236        let schema = employee_schema();
2237        let projection = None;
2238        let err =
2239            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2240        assert_snapshot!(
2241            err.strip_backtrace(),
2242            @"Error during planning: table_name cannot be empty"
2243        );
2244    }
2245
2246    #[test]
2247    fn plan_builder_sort() -> Result<()> {
2248        let plan =
2249            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2250                .sort(vec![
2251                    expr::Sort::new(col("state"), true, true),
2252                    expr::Sort::new(col("salary"), false, false),
2253                ])?
2254                .build()?;
2255
2256        assert_snapshot!(plan, @r"
2257        Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2258          TableScan: employee_csv projection=[state, salary]
2259        ");
2260
2261        Ok(())
2262    }
2263
2264    #[test]
2265    fn plan_builder_union() -> Result<()> {
2266        let plan =
2267            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2268
2269        let plan = plan
2270            .clone()
2271            .union(plan.clone().build()?)?
2272            .union(plan.clone().build()?)?
2273            .union(plan.build()?)?
2274            .build()?;
2275
2276        assert_snapshot!(plan, @r"
2277        Union
2278          Union
2279            Union
2280              TableScan: employee_csv projection=[state, salary]
2281              TableScan: employee_csv projection=[state, salary]
2282            TableScan: employee_csv projection=[state, salary]
2283          TableScan: employee_csv projection=[state, salary]
2284        ");
2285
2286        Ok(())
2287    }
2288
2289    #[test]
2290    fn plan_builder_union_distinct() -> Result<()> {
2291        let plan =
2292            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2293
2294        let plan = plan
2295            .clone()
2296            .union_distinct(plan.clone().build()?)?
2297            .union_distinct(plan.clone().build()?)?
2298            .union_distinct(plan.build()?)?
2299            .build()?;
2300
2301        assert_snapshot!(plan, @r"
2302        Distinct:
2303          Union
2304            Distinct:
2305              Union
2306                Distinct:
2307                  Union
2308                    TableScan: employee_csv projection=[state, salary]
2309                    TableScan: employee_csv projection=[state, salary]
2310                TableScan: employee_csv projection=[state, salary]
2311            TableScan: employee_csv projection=[state, salary]
2312        ");
2313
2314        Ok(())
2315    }
2316
2317    #[test]
2318    fn plan_builder_simple_distinct() -> Result<()> {
2319        let plan =
2320            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2321                .filter(col("state").eq(lit("CO")))?
2322                .project(vec![col("id")])?
2323                .distinct()?
2324                .build()?;
2325
2326        assert_snapshot!(plan, @r#"
2327        Distinct:
2328          Projection: employee_csv.id
2329            Filter: employee_csv.state = Utf8("CO")
2330              TableScan: employee_csv projection=[id, state]
2331        "#);
2332
2333        Ok(())
2334    }
2335
2336    #[test]
2337    fn exists_subquery() -> Result<()> {
2338        let foo = test_table_scan_with_name("foo")?;
2339        let bar = test_table_scan_with_name("bar")?;
2340
2341        let subquery = LogicalPlanBuilder::from(foo)
2342            .project(vec![col("a")])?
2343            .filter(col("a").eq(col("bar.a")))?
2344            .build()?;
2345
2346        let outer_query = LogicalPlanBuilder::from(bar)
2347            .project(vec![col("a")])?
2348            .filter(exists(Arc::new(subquery)))?
2349            .build()?;
2350
2351        assert_snapshot!(outer_query, @r"
2352        Filter: EXISTS (<subquery>)
2353          Subquery:
2354            Filter: foo.a = bar.a
2355              Projection: foo.a
2356                TableScan: foo
2357          Projection: bar.a
2358            TableScan: bar
2359        ");
2360
2361        Ok(())
2362    }
2363
2364    #[test]
2365    fn filter_in_subquery() -> Result<()> {
2366        let foo = test_table_scan_with_name("foo")?;
2367        let bar = test_table_scan_with_name("bar")?;
2368
2369        let subquery = LogicalPlanBuilder::from(foo)
2370            .project(vec![col("a")])?
2371            .filter(col("a").eq(col("bar.a")))?
2372            .build()?;
2373
2374        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2375        let outer_query = LogicalPlanBuilder::from(bar)
2376            .project(vec![col("a")])?
2377            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2378            .build()?;
2379
2380        assert_snapshot!(outer_query, @r"
2381        Filter: bar.a IN (<subquery>)
2382          Subquery:
2383            Filter: foo.a = bar.a
2384              Projection: foo.a
2385                TableScan: foo
2386          Projection: bar.a
2387            TableScan: bar
2388        ");
2389
2390        Ok(())
2391    }
2392
2393    #[test]
2394    fn select_scalar_subquery() -> Result<()> {
2395        let foo = test_table_scan_with_name("foo")?;
2396        let bar = test_table_scan_with_name("bar")?;
2397
2398        let subquery = LogicalPlanBuilder::from(foo)
2399            .project(vec![col("b")])?
2400            .filter(col("a").eq(col("bar.a")))?
2401            .build()?;
2402
2403        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2404        let outer_query = LogicalPlanBuilder::from(bar)
2405            .project(vec![scalar_subquery(Arc::new(subquery))])?
2406            .build()?;
2407
2408        assert_snapshot!(outer_query, @r"
2409        Projection: (<subquery>)
2410          Subquery:
2411            Filter: foo.a = bar.a
2412              Projection: foo.b
2413                TableScan: foo
2414          TableScan: bar
2415        ");
2416
2417        Ok(())
2418    }
2419
2420    #[test]
2421    fn projection_non_unique_names() -> Result<()> {
2422        let plan = table_scan(
2423            Some("employee_csv"),
2424            &employee_schema(),
2425            // project id and first_name by column index
2426            Some(vec![0, 1]),
2427        )?
2428        // two columns with the same name => error
2429        .project(vec![col("id"), col("first_name").alias("id")]);
2430
2431        match plan {
2432            Err(DataFusionError::SchemaError(err, _)) => {
2433                if let SchemaError::AmbiguousReference { field } = *err {
2434                    let Column {
2435                        relation,
2436                        name,
2437                        spans: _,
2438                    } = *field;
2439                    let Some(TableReference::Bare { table }) = relation else {
2440                        return plan_err!(
2441                            "wrong relation: {relation:?}, expected table name"
2442                        );
2443                    };
2444                    assert_eq!(*"employee_csv", *table);
2445                    assert_eq!("id", &name);
2446                    Ok(())
2447                } else {
2448                    plan_err!("Plan should have returned an DataFusionError::SchemaError")
2449                }
2450            }
2451            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2452        }
2453    }
2454
2455    fn employee_schema() -> Schema {
2456        Schema::new(vec![
2457            Field::new("id", DataType::Int32, false),
2458            Field::new("first_name", DataType::Utf8, false),
2459            Field::new("last_name", DataType::Utf8, false),
2460            Field::new("state", DataType::Utf8, false),
2461            Field::new("salary", DataType::Int32, false),
2462        ])
2463    }
2464
2465    #[test]
2466    fn stringified_plan() {
2467        let stringified_plan =
2468            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2469        assert!(stringified_plan.should_display(true));
2470        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2471
2472        let stringified_plan =
2473            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2474        assert!(stringified_plan.should_display(true));
2475        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2476
2477        let stringified_plan =
2478            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2479        assert!(stringified_plan.should_display(true));
2480        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2481
2482        let stringified_plan =
2483            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2484        assert!(stringified_plan.should_display(true));
2485        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2486
2487        let stringified_plan = StringifiedPlan::new(
2488            PlanType::OptimizedLogicalPlan {
2489                optimizer_name: "random opt pass".into(),
2490            },
2491            "...the plan...",
2492        );
2493        assert!(stringified_plan.should_display(true));
2494        assert!(!stringified_plan.should_display(false));
2495    }
2496
2497    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2498        let schema = Schema::new(vec![
2499            Field::new("a", DataType::UInt32, false),
2500            Field::new("b", DataType::UInt32, false),
2501            Field::new("c", DataType::UInt32, false),
2502        ]);
2503        table_scan(Some(name), &schema, None)?.build()
2504    }
2505
2506    #[test]
2507    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2508        let plan1 =
2509            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2510        let plan2 =
2511            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2512
2513        let err_msg1 =
2514            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2515                .unwrap_err();
2516
2517        assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2518
2519        Ok(())
2520    }
2521
2522    #[test]
2523    fn plan_builder_unnest() -> Result<()> {
2524        // Cannot unnest on a scalar column
2525        let err = nested_table_scan("test_table")?
2526            .unnest_column("scalar")
2527            .unwrap_err();
2528
2529        let DataFusionError::Internal(desc) = err else {
2530            return plan_err!("Plan should have returned an DataFusionError::Internal");
2531        };
2532
2533        let desc = (*desc
2534            .split(DataFusionError::BACK_TRACE_SEP)
2535            .collect::<Vec<&str>>()
2536            .first()
2537            .unwrap_or(&""))
2538        .to_string();
2539
2540        assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2541
2542        // Unnesting the strings list.
2543        let plan = nested_table_scan("test_table")?
2544            .unnest_column("strings")?
2545            .build()?;
2546
2547        assert_snapshot!(plan, @r"
2548        Unnest: lists[test_table.strings|depth=1] structs[]
2549          TableScan: test_table
2550        ");
2551
2552        // Check unnested field is a scalar
2553        let field = plan.schema().field_with_name(None, "strings").unwrap();
2554        assert_eq!(&DataType::Utf8, field.data_type());
2555
2556        // Unnesting the singular struct column result into 2 new columns for each subfield
2557        let plan = nested_table_scan("test_table")?
2558            .unnest_column("struct_singular")?
2559            .build()?;
2560
2561        assert_snapshot!(plan, @r"
2562        Unnest: lists[] structs[test_table.struct_singular]
2563          TableScan: test_table
2564        ");
2565
2566        for field_name in &["a", "b"] {
2567            // Check unnested struct field is a scalar
2568            let field = plan
2569                .schema()
2570                .field_with_name(None, &format!("struct_singular.{field_name}"))
2571                .unwrap();
2572            assert_eq!(&DataType::UInt32, field.data_type());
2573        }
2574
2575        // Unnesting multiple fields in separate plans
2576        let plan = nested_table_scan("test_table")?
2577            .unnest_column("strings")?
2578            .unnest_column("structs")?
2579            .unnest_column("struct_singular")?
2580            .build()?;
2581
2582        assert_snapshot!(plan, @r"
2583        Unnest: lists[] structs[test_table.struct_singular]
2584          Unnest: lists[test_table.structs|depth=1] structs[]
2585            Unnest: lists[test_table.strings|depth=1] structs[]
2586              TableScan: test_table
2587        ");
2588
2589        // Check unnested struct list field should be a struct.
2590        let field = plan.schema().field_with_name(None, "structs").unwrap();
2591        assert!(matches!(field.data_type(), DataType::Struct(_)));
2592
2593        // Unnesting multiple fields at the same time, using infer syntax
2594        let cols = vec!["strings", "structs", "struct_singular"]
2595            .into_iter()
2596            .map(|c| c.into())
2597            .collect();
2598
2599        let plan = nested_table_scan("test_table")?
2600            .unnest_columns_with_options(cols, UnnestOptions::default())?
2601            .build()?;
2602
2603        assert_snapshot!(plan, @r"
2604        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2605          TableScan: test_table
2606        ");
2607
2608        // Unnesting missing column should fail.
2609        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2610        assert!(plan.is_err());
2611
2612        // Simultaneously unnesting a list (with different depth) and a struct column
2613        let plan = nested_table_scan("test_table")?
2614            .unnest_columns_with_options(
2615                vec!["stringss".into(), "struct_singular".into()],
2616                UnnestOptions::default()
2617                    .with_recursions(RecursionUnnestOption {
2618                        input_column: "stringss".into(),
2619                        output_column: "stringss_depth_1".into(),
2620                        depth: 1,
2621                    })
2622                    .with_recursions(RecursionUnnestOption {
2623                        input_column: "stringss".into(),
2624                        output_column: "stringss_depth_2".into(),
2625                        depth: 2,
2626                    }),
2627            )?
2628            .build()?;
2629
2630        assert_snapshot!(plan, @r"
2631        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2632          TableScan: test_table
2633        ");
2634
2635        // Check output columns has correct type
2636        let field = plan
2637            .schema()
2638            .field_with_name(None, "stringss_depth_1")
2639            .unwrap();
2640        assert_eq!(
2641            &DataType::new_list(DataType::Utf8, false),
2642            field.data_type()
2643        );
2644        let field = plan
2645            .schema()
2646            .field_with_name(None, "stringss_depth_2")
2647            .unwrap();
2648        assert_eq!(&DataType::Utf8, field.data_type());
2649        // unnesting struct is still correct
2650        for field_name in &["a", "b"] {
2651            let field = plan
2652                .schema()
2653                .field_with_name(None, &format!("struct_singular.{field_name}"))
2654                .unwrap();
2655            assert_eq!(&DataType::UInt32, field.data_type());
2656        }
2657
2658        Ok(())
2659    }
2660
2661    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2662        // Create a schema with a scalar field, a list of strings, a list of structs
2663        // and a singular struct
2664        let struct_field_in_list = Field::new_struct(
2665            "item",
2666            vec![
2667                Field::new("a", DataType::UInt32, false),
2668                Field::new("b", DataType::UInt32, false),
2669            ],
2670            false,
2671        );
2672        let string_field = Field::new_list_field(DataType::Utf8, false);
2673        let strings_field = Field::new_list("item", string_field.clone(), false);
2674        let schema = Schema::new(vec![
2675            Field::new("scalar", DataType::UInt32, false),
2676            Field::new_list("strings", string_field, false),
2677            Field::new_list("structs", struct_field_in_list, false),
2678            Field::new(
2679                "struct_singular",
2680                DataType::Struct(Fields::from(vec![
2681                    Field::new("a", DataType::UInt32, false),
2682                    Field::new("b", DataType::UInt32, false),
2683                ])),
2684                false,
2685            ),
2686            Field::new_list("stringss", strings_field, false),
2687        ]);
2688
2689        table_scan(Some(table_name), &schema, None)
2690    }
2691
2692    #[test]
2693    fn test_union_after_join() -> Result<()> {
2694        let values = vec![vec![lit(1)]];
2695
2696        let left = LogicalPlanBuilder::values(values.clone())?
2697            .alias("left")?
2698            .build()?;
2699        let right = LogicalPlanBuilder::values(values)?
2700            .alias("right")?
2701            .build()?;
2702
2703        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2704
2705        let plan = LogicalPlanBuilder::from(join.clone())
2706            .union(join)?
2707            .build()?;
2708
2709        assert_snapshot!(plan, @r"
2710        Union
2711          Cross Join: 
2712            SubqueryAlias: left
2713              Values: (Int32(1))
2714            SubqueryAlias: right
2715              Values: (Int32(1))
2716          Cross Join: 
2717            SubqueryAlias: left
2718              Values: (Int32(1))
2719            SubqueryAlias: right
2720              Values: (Int32(1))
2721        ");
2722
2723        Ok(())
2724    }
2725
2726    #[test]
2727    fn plan_builder_from_logical_plan() -> Result<()> {
2728        let plan =
2729            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2730                .sort(vec![
2731                    expr::Sort::new(col("state"), true, true),
2732                    expr::Sort::new(col("salary"), false, false),
2733                ])?
2734                .build()?;
2735
2736        let plan_expected = format!("{plan}");
2737        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2738        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2739
2740        Ok(())
2741    }
2742
2743    #[test]
2744    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2745        let constraints =
2746            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2747        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2748
2749        let plan =
2750            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2751                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2752                .build()?;
2753
2754        assert_snapshot!(plan, @r"
2755        Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2756          TableScan: employee_csv projection=[id, state, salary]
2757        ");
2758
2759        Ok(())
2760    }
2761
2762    #[test]
2763    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2764        let constraints =
2765            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2766        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2767
2768        let options =
2769            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2770        let plan =
2771            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2772                .with_options(options)
2773                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2774                .build()?;
2775
2776        assert_snapshot!(plan, @r"
2777        Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2778          TableScan: employee_csv projection=[id, state, salary]
2779        ");
2780
2781        Ok(())
2782    }
2783
2784    #[test]
2785    fn test_join_metadata() -> Result<()> {
2786        let left_schema = DFSchema::new_with_metadata(
2787            vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2788            HashMap::from([("key".to_string(), "left".to_string())]),
2789        )?;
2790        let right_schema = DFSchema::new_with_metadata(
2791            vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2792            HashMap::from([("key".to_string(), "right".to_string())]),
2793        )?;
2794
2795        let join_schema =
2796            build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2797        assert_eq!(
2798            join_schema.metadata(),
2799            &HashMap::from([("key".to_string(), "left".to_string())])
2800        );
2801        let join_schema =
2802            build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2803        assert_eq!(
2804            join_schema.metadata(),
2805            &HashMap::from([("key".to_string(), "right".to_string())])
2806        );
2807
2808        Ok(())
2809    }
2810
2811    #[test]
2812    fn test_values_metadata() -> Result<()> {
2813        let metadata: HashMap<String, String> =
2814            [("ARROW:extension:metadata".to_string(), "test".to_string())]
2815                .into_iter()
2816                .collect();
2817        let metadata = FieldMetadata::from(metadata);
2818        let values = LogicalPlanBuilder::values(vec![
2819            vec![lit_with_metadata(1, Some(metadata.clone()))],
2820            vec![lit_with_metadata(2, Some(metadata.clone()))],
2821        ])?
2822        .build()?;
2823        assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2824
2825        // Do not allow VALUES with different metadata mixed together
2826        let metadata2: HashMap<String, String> =
2827            [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2828                .into_iter()
2829                .collect();
2830        let metadata2 = FieldMetadata::from(metadata2);
2831        assert!(LogicalPlanBuilder::values(vec![
2832            vec![lit_with_metadata(1, Some(metadata.clone()))],
2833            vec![lit_with_metadata(2, Some(metadata2.clone()))],
2834        ])
2835        .is_err());
2836
2837        Ok(())
2838    }
2839
2840    #[test]
2841    fn test_unique_field_aliases() {
2842        let t1_field_1 = Field::new("a", DataType::Int32, false);
2843        let t2_field_1 = Field::new("a", DataType::Int32, false);
2844        let t2_field_3 = Field::new("a", DataType::Int32, false);
2845        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2846        let t1_field_2 = Field::new("b", DataType::Int32, false);
2847        let t2_field_2 = Field::new("b", DataType::Int32, false);
2848
2849        let fields = vec![
2850            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2851        ];
2852        let fields = Fields::from(fields);
2853
2854        let remove_redundant = unique_field_aliases(&fields);
2855
2856        // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2857        // First occurrence of each field name keeps original name (None), duplicates get
2858        // incremental suffixes (:1, :2, etc.).
2859        // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2860        // conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2861        // as well to `a:1:1`.
2862        assert_eq!(
2863            remove_redundant,
2864            vec![
2865                None,
2866                Some("a:1".to_string()),
2867                None,
2868                Some("b:1".to_string()),
2869                Some("a:2".to_string()),
2870                Some("a:1:1".to_string()),
2871            ]
2872        );
2873    }
2874}