datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    ast::{
20        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22    },
23    rewrite::{
24        inject_column_aliases_into_subquery, normalize_union_schema,
25        rewrite_plan_for_sort_on_non_projected_fields,
26        subquery_alias_inner_query_and_columns, TableAliasRewriter,
27    },
28    utils::{
29        find_agg_node_within_select, find_unnest_node_within_select,
30        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32    },
33    Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37    UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    internal_err, not_impl_err,
43    tree_node::{TransformedResult, TreeNode},
44    Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// let sql = plan_to_sql(&plan).unwrap(); // convert to AST
85/// // use the Display impl to convert to SQL text
86/// assert_eq!(sql.to_string(), "SELECT \"table\".id, \"table\".\"value\" FROM \"table\"")
87/// ```
88///
89/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
90/// [`expr_to_sql`]: crate::unparser::expr_to_sql
91pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92    let unparser = Unparser::default();
93    unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98        let plan = normalize_union_schema(plan)?;
99
100        match plan {
101            LogicalPlan::Projection(_)
102            | LogicalPlan::Filter(_)
103            | LogicalPlan::Window(_)
104            | LogicalPlan::Aggregate(_)
105            | LogicalPlan::Sort(_)
106            | LogicalPlan::Join(_)
107            | LogicalPlan::Repartition(_)
108            | LogicalPlan::Union(_)
109            | LogicalPlan::TableScan(_)
110            | LogicalPlan::EmptyRelation(_)
111            | LogicalPlan::Subquery(_)
112            | LogicalPlan::SubqueryAlias(_)
113            | LogicalPlan::Limit(_)
114            | LogicalPlan::Statement(_)
115            | LogicalPlan::Values(_)
116            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118            LogicalPlan::Extension(extension) => {
119                self.extension_to_statement(extension.node.as_ref())
120            }
121            LogicalPlan::Explain(_)
122            | LogicalPlan::Analyze(_)
123            | LogicalPlan::Ddl(_)
124            | LogicalPlan::Copy(_)
125            | LogicalPlan::DescribeTable(_)
126            | LogicalPlan::RecursiveQuery(_)
127            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128        }
129    }
130
131    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
132    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
133    /// the first unparsing result will be returned.
134    fn extension_to_statement(
135        &self,
136        node: &dyn UserDefinedLogicalNode,
137    ) -> Result<ast::Statement> {
138        let mut statement = None;
139        for unparser in &self.extension_unparsers {
140            match unparser.unparse_to_statement(node, self)? {
141                UnparseToStatementResult::Modified(stmt) => {
142                    statement = Some(stmt);
143                    break;
144                }
145                UnparseToStatementResult::Unmodified => {}
146            }
147        }
148        if let Some(statement) = statement {
149            Ok(statement)
150        } else {
151            not_impl_err!("Unsupported extension node: {node:?}")
152        }
153    }
154
155    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
156    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
157    /// the first unparser supporting the node will be used.
158    fn extension_to_sql(
159        &self,
160        node: &dyn UserDefinedLogicalNode,
161        query: &mut Option<&mut QueryBuilder>,
162        select: &mut Option<&mut SelectBuilder>,
163        relation: &mut Option<&mut RelationBuilder>,
164    ) -> Result<()> {
165        for unparser in &self.extension_unparsers {
166            match unparser.unparse(node, self, query, select, relation)? {
167                UnparseWithinStatementResult::Modified => return Ok(()),
168                UnparseWithinStatementResult::Unmodified => {}
169            }
170        }
171        not_impl_err!("Unsupported extension node: {node:?}")
172    }
173
174    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175        let mut query_builder = Some(QueryBuilder::default());
176
177        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179        let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181        Ok(ast::Statement::Query(Box::new(query)))
182    }
183
184    fn select_to_sql_expr(
185        &self,
186        plan: &LogicalPlan,
187        query: &mut Option<QueryBuilder>,
188    ) -> Result<SetExpr> {
189        let mut select_builder = SelectBuilder::default();
190        select_builder.push_from(TableWithJoinsBuilder::default());
191        let mut relation_builder = RelationBuilder::default();
192        self.select_to_sql_recursively(
193            plan,
194            query,
195            &mut select_builder,
196            &mut relation_builder,
197        )?;
198
199        // If we were able to construct a full body (i.e. UNION ALL), return it
200        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201            return Ok(*body);
202        }
203
204        // If no projection is set, add a wildcard projection to the select
205        // which will be translated to `SELECT *` in the SQL statement
206        if !select_builder.already_projected() {
207            select_builder.projection(vec![ast::SelectItem::Wildcard(
208                ast::WildcardAdditionalOptions::default(),
209            )]);
210        }
211
212        let mut twj = select_builder.pop_from().unwrap();
213        twj.relation(relation_builder);
214        select_builder.push_from(twj);
215
216        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217    }
218
219    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
220    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
221    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
222    fn reconstruct_select_statement(
223        &self,
224        plan: &LogicalPlan,
225        p: &Projection,
226        select: &mut SelectBuilder,
227    ) -> Result<()> {
228        let mut exprs = p.expr.clone();
229
230        // If an Unnest node is found within the select, find and unproject the unnest column
231        if let Some(unnest) = find_unnest_node_within_select(plan) {
232            exprs = exprs
233                .into_iter()
234                .map(|e| unproject_unnest_expr(e, unnest))
235                .collect::<Result<Vec<_>>>()?;
236        };
237
238        match (
239            find_agg_node_within_select(plan, true),
240            find_window_nodes_within_select(plan, None, true),
241        ) {
242            (Some(agg), window) => {
243                let window_option = window.as_deref();
244                let items = exprs
245                    .into_iter()
246                    .map(|proj_expr| {
247                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248                        self.select_item_to_sql(&unproj)
249                    })
250                    .collect::<Result<Vec<_>>>()?;
251
252                select.projection(items);
253                select.group_by(ast::GroupByExpr::Expressions(
254                    agg.group_expr
255                        .iter()
256                        .map(|expr| self.expr_to_sql(expr))
257                        .collect::<Result<Vec<_>>>()?,
258                    vec![],
259                ));
260            }
261            (None, Some(window)) => {
262                let items = exprs
263                    .into_iter()
264                    .map(|proj_expr| {
265                        let unproj = unproject_window_exprs(proj_expr, &window)?;
266                        self.select_item_to_sql(&unproj)
267                    })
268                    .collect::<Result<Vec<_>>>()?;
269
270                select.projection(items);
271            }
272            _ => {
273                let items = exprs
274                    .iter()
275                    .map(|e| self.select_item_to_sql(e))
276                    .collect::<Result<Vec<_>>>()?;
277                select.projection(items);
278            }
279        }
280        Ok(())
281    }
282
283    fn derive(
284        &self,
285        plan: &LogicalPlan,
286        relation: &mut RelationBuilder,
287        alias: Option<ast::TableAlias>,
288        lateral: bool,
289    ) -> Result<()> {
290        let mut derived_builder = DerivedRelationBuilder::default();
291        derived_builder.lateral(lateral).alias(alias).subquery({
292            let inner_statement = self.plan_to_sql(plan)?;
293            if let ast::Statement::Query(inner_query) = inner_statement {
294                inner_query
295            } else {
296                return internal_err!(
297                    "Subquery must be a Query, but found {inner_statement:?}"
298                );
299            }
300        });
301        relation.derived(derived_builder);
302
303        Ok(())
304    }
305
306    fn derive_with_dialect_alias(
307        &self,
308        alias: &str,
309        plan: &LogicalPlan,
310        relation: &mut RelationBuilder,
311        lateral: bool,
312        columns: Vec<Ident>,
313    ) -> Result<()> {
314        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
315            self.derive(
316                plan,
317                relation,
318                Some(self.new_table_alias(alias.to_string(), columns)),
319                lateral,
320            )
321        } else {
322            self.derive(plan, relation, None, lateral)
323        }
324    }
325
326    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
327    fn select_to_sql_recursively(
328        &self,
329        plan: &LogicalPlan,
330        query: &mut Option<QueryBuilder>,
331        select: &mut SelectBuilder,
332        relation: &mut RelationBuilder,
333    ) -> Result<()> {
334        match plan {
335            LogicalPlan::TableScan(scan) => {
336                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
337                    plan,
338                    None,
339                    select.already_projected(),
340                )? {
341                    return self.select_to_sql_recursively(
342                        &unparsed_table_scan,
343                        query,
344                        select,
345                        relation,
346                    );
347                }
348                let mut builder = TableRelationBuilder::default();
349                let mut table_parts = vec![];
350                if let Some(catalog_name) = scan.table_name.catalog() {
351                    table_parts
352                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
353                }
354                if let Some(schema_name) = scan.table_name.schema() {
355                    table_parts
356                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
357                }
358                table_parts.push(
359                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
360                );
361                builder.name(ast::ObjectName::from(table_parts));
362                relation.table(builder);
363
364                Ok(())
365            }
366            LogicalPlan::Projection(p) => {
367                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
368                    return self
369                        .select_to_sql_recursively(&new_plan, query, select, relation);
370                }
371
372                // Projection can be top-level plan for unnest relation
373                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
374                // only one expression, which is the placeholder column generated by the rewriter.
375                let unnest_input_type = if p.expr.len() == 1 {
376                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
377                } else {
378                    None
379                };
380                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
381                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
382                        if let Some(unnest_relation) =
383                            self.try_unnest_to_table_factor_sql(unnest)?
384                        {
385                            relation.unnest(unnest_relation);
386                            return self.select_to_sql_recursively(
387                                p.input.as_ref(),
388                                query,
389                                select,
390                                relation,
391                            );
392                        }
393                    }
394                }
395
396                // If it's a unnest projection, we should provide the table column alias
397                // to provide a column name for the unnest relation.
398                let columns = if unnest_input_type.is_some() {
399                    p.expr
400                        .iter()
401                        .map(|e| {
402                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
403                        })
404                        .collect()
405                } else {
406                    vec![]
407                };
408                // Projection can be top-level plan for derived table
409                if select.already_projected() {
410                    return self.derive_with_dialect_alias(
411                        "derived_projection",
412                        plan,
413                        relation,
414                        unnest_input_type
415                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
416                            .is_some(),
417                        columns,
418                    );
419                }
420                self.reconstruct_select_statement(plan, p, select)?;
421                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
422            }
423            LogicalPlan::Filter(filter) => {
424                if let Some(agg) =
425                    find_agg_node_within_select(plan, select.already_projected())
426                {
427                    let unprojected =
428                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
429                    let filter_expr = self.expr_to_sql(&unprojected)?;
430                    select.having(Some(filter_expr));
431                } else {
432                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
433                    select.selection(Some(filter_expr));
434                }
435
436                self.select_to_sql_recursively(
437                    filter.input.as_ref(),
438                    query,
439                    select,
440                    relation,
441                )
442            }
443            LogicalPlan::Limit(limit) => {
444                // Limit can be top-level plan for derived table
445                if select.already_projected() {
446                    return self.derive_with_dialect_alias(
447                        "derived_limit",
448                        plan,
449                        relation,
450                        false,
451                        vec![],
452                    );
453                }
454                if let Some(fetch) = &limit.fetch {
455                    let Some(query) = query.as_mut() else {
456                        return internal_err!(
457                            "Limit operator only valid in a statement context."
458                        );
459                    };
460                    query.limit(Some(self.expr_to_sql(fetch)?));
461                }
462
463                if let Some(skip) = &limit.skip {
464                    let Some(query) = query.as_mut() else {
465                        return internal_err!(
466                            "Offset operator only valid in a statement context."
467                        );
468                    };
469
470                    query.offset(Some(ast::Offset {
471                        rows: ast::OffsetRows::None,
472                        value: self.expr_to_sql(skip)?,
473                    }));
474                }
475
476                self.select_to_sql_recursively(
477                    limit.input.as_ref(),
478                    query,
479                    select,
480                    relation,
481                )
482            }
483            LogicalPlan::Sort(sort) => {
484                // Sort can be top-level plan for derived table
485                if select.already_projected() {
486                    return self.derive_with_dialect_alias(
487                        "derived_sort",
488                        plan,
489                        relation,
490                        false,
491                        vec![],
492                    );
493                }
494                let Some(query_ref) = query else {
495                    return internal_err!(
496                        "Sort operator only valid in a statement context."
497                    );
498                };
499
500                if let Some(fetch) = sort.fetch {
501                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
502                        fetch.to_string(),
503                        false,
504                    ))));
505                };
506
507                let agg = find_agg_node_within_select(plan, select.already_projected());
508                // unproject sort expressions
509                let sort_exprs: Vec<SortExpr> = sort
510                    .expr
511                    .iter()
512                    .map(|sort_expr| {
513                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
514                    })
515                    .collect::<Result<Vec<_>>>()?;
516
517                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
518
519                self.select_to_sql_recursively(
520                    sort.input.as_ref(),
521                    query,
522                    select,
523                    relation,
524                )
525            }
526            LogicalPlan::Aggregate(agg) => {
527                // Aggregation can be already handled in the projection case
528                if !select.already_projected() {
529                    // The query returns aggregate and group expressions. If that weren't the case,
530                    // the aggregate would have been placed inside a projection, making the check above^ false
531                    let exprs: Vec<_> = agg
532                        .aggr_expr
533                        .iter()
534                        .chain(agg.group_expr.iter())
535                        .map(|expr| self.select_item_to_sql(expr))
536                        .collect::<Result<Vec<_>>>()?;
537                    select.projection(exprs);
538
539                    select.group_by(ast::GroupByExpr::Expressions(
540                        agg.group_expr
541                            .iter()
542                            .map(|expr| self.expr_to_sql(expr))
543                            .collect::<Result<Vec<_>>>()?,
544                        vec![],
545                    ));
546                }
547
548                self.select_to_sql_recursively(
549                    agg.input.as_ref(),
550                    query,
551                    select,
552                    relation,
553                )
554            }
555            LogicalPlan::Distinct(distinct) => {
556                // Distinct can be top-level plan for derived table
557                if select.already_projected() {
558                    return self.derive_with_dialect_alias(
559                        "derived_distinct",
560                        plan,
561                        relation,
562                        false,
563                        vec![],
564                    );
565                }
566
567                // If this distinct is the parent of a Union and we're in a query context,
568                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
569                if let Distinct::All(input) = distinct {
570                    if matches!(input.as_ref(), LogicalPlan::Union(_)) {
571                        if let Some(query_mut) = query.as_mut() {
572                            query_mut.distinct_union();
573                            return self.select_to_sql_recursively(
574                                input.as_ref(),
575                                query,
576                                select,
577                                relation,
578                            );
579                        }
580                    }
581                }
582
583                let (select_distinct, input) = match distinct {
584                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
585                    Distinct::On(on) => {
586                        let exprs = on
587                            .on_expr
588                            .iter()
589                            .map(|e| self.expr_to_sql(e))
590                            .collect::<Result<Vec<_>>>()?;
591                        let items = on
592                            .select_expr
593                            .iter()
594                            .map(|e| self.select_item_to_sql(e))
595                            .collect::<Result<Vec<_>>>()?;
596                        if let Some(sort_expr) = &on.sort_expr {
597                            if let Some(query_ref) = query {
598                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
599                            } else {
600                                return internal_err!(
601                                    "Sort operator only valid in a statement context."
602                                );
603                            }
604                        }
605                        select.projection(items);
606                        (ast::Distinct::On(exprs), on.input.as_ref())
607                    }
608                };
609                select.distinct(Some(select_distinct));
610                self.select_to_sql_recursively(input, query, select, relation)
611            }
612            LogicalPlan::Join(join) => {
613                let mut table_scan_filters = vec![];
614                let (left_plan, right_plan) = match join.join_type {
615                    JoinType::RightSemi | JoinType::RightAnti => {
616                        (&join.right, &join.left)
617                    }
618                    _ => (&join.left, &join.right),
619                };
620                // If there's an outer projection plan, it will already set up the projection.
621                // In that case, we don't need to worry about setting up the projection here.
622                // The outer projection plan will handle projecting the correct columns.
623                let already_projected = select.already_projected();
624
625                let left_plan =
626                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
627                        Some((plan, filters)) => {
628                            table_scan_filters.extend(filters);
629                            Arc::new(plan)
630                        }
631                        None => Arc::clone(left_plan),
632                    };
633
634                self.select_to_sql_recursively(
635                    left_plan.as_ref(),
636                    query,
637                    select,
638                    relation,
639                )?;
640
641                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
642                {
643                    Some(select.pop_projections())
644                } else {
645                    None
646                };
647
648                let right_plan =
649                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
650                        Some((plan, filters)) => {
651                            table_scan_filters.extend(filters);
652                            Arc::new(plan)
653                        }
654                        None => Arc::clone(right_plan),
655                    };
656
657                let mut right_relation = RelationBuilder::default();
658
659                self.select_to_sql_recursively(
660                    right_plan.as_ref(),
661                    query,
662                    select,
663                    &mut right_relation,
664                )?;
665
666                let join_filters = if table_scan_filters.is_empty() {
667                    join.filter.clone()
668                } else {
669                    // Combine `table_scan_filters` into a single filter using `AND`
670                    let Some(combined_filters) =
671                        table_scan_filters.into_iter().reduce(|acc, filter| {
672                            Expr::BinaryExpr(BinaryExpr {
673                                left: Box::new(acc),
674                                op: Operator::And,
675                                right: Box::new(filter),
676                            })
677                        })
678                    else {
679                        return internal_err!("Failed to combine TableScan filters");
680                    };
681
682                    // Combine `join.filter` with `combined_filters` using `AND`
683                    match &join.filter {
684                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
685                            left: Box::new(filter.clone()),
686                            op: Operator::And,
687                            right: Box::new(combined_filters),
688                        })),
689                        None => Some(combined_filters),
690                    }
691                };
692
693                let join_constraint = self.join_constraint_to_sql(
694                    join.join_constraint,
695                    &join.on,
696                    join_filters.as_ref(),
697                )?;
698
699                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
700                {
701                    Some(select.pop_projections())
702                } else {
703                    None
704                };
705
706                match join.join_type {
707                    JoinType::LeftSemi
708                    | JoinType::LeftAnti
709                    | JoinType::LeftMark
710                    | JoinType::RightSemi
711                    | JoinType::RightAnti
712                    | JoinType::RightMark => {
713                        let mut query_builder = QueryBuilder::default();
714                        let mut from = TableWithJoinsBuilder::default();
715                        let mut exists_select: SelectBuilder = SelectBuilder::default();
716                        from.relation(right_relation);
717                        exists_select.push_from(from);
718                        if let Some(filter) = &join.filter {
719                            exists_select.selection(Some(self.expr_to_sql(filter)?));
720                        }
721                        for (left, right) in &join.on {
722                            exists_select.selection(Some(
723                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
724                            ));
725                        }
726                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
727                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
728                        )]);
729                        query_builder.body(Box::new(SetExpr::Select(Box::new(
730                            exists_select.build()?,
731                        ))));
732
733                        let negated = match join.join_type {
734                            JoinType::LeftSemi
735                            | JoinType::RightSemi
736                            | JoinType::LeftMark
737                            | JoinType::RightMark => false,
738                            JoinType::LeftAnti | JoinType::RightAnti => true,
739                            _ => unreachable!(),
740                        };
741                        let exists_expr = ast::Expr::Exists {
742                            subquery: Box::new(query_builder.build()?),
743                            negated,
744                        };
745
746                        match join.join_type {
747                            JoinType::LeftMark | JoinType::RightMark => {
748                                let source_schema =
749                                    if join.join_type == JoinType::LeftMark {
750                                        right_plan.schema()
751                                    } else {
752                                        left_plan.schema()
753                                    };
754                                let (table_ref, _) = source_schema.qualified_field(0);
755                                let column = self.col_to_sql(&Column::new(
756                                    table_ref.cloned(),
757                                    "mark",
758                                ))?;
759                                select.replace_mark(&column, &exists_expr);
760                            }
761                            _ => {
762                                select.selection(Some(exists_expr));
763                            }
764                        }
765                        if let Some(projection) = left_projection {
766                            select.projection(projection);
767                        }
768                    }
769                    JoinType::Inner
770                    | JoinType::Left
771                    | JoinType::Right
772                    | JoinType::Full => {
773                        let Ok(Some(relation)) = right_relation.build() else {
774                            return internal_err!("Failed to build right relation");
775                        };
776                        let ast_join = ast::Join {
777                            relation,
778                            global: false,
779                            join_operator: self
780                                .join_operator_to_sql(join.join_type, join_constraint)?,
781                        };
782                        let mut from = select.pop_from().unwrap();
783                        from.push_join(ast_join);
784                        select.push_from(from);
785                        if !already_projected {
786                            let Some(left_projection) = left_projection else {
787                                return internal_err!("Left projection is missing");
788                            };
789
790                            let Some(right_projection) = right_projection else {
791                                return internal_err!("Right projection is missing");
792                            };
793
794                            let projection = left_projection
795                                .into_iter()
796                                .chain(right_projection)
797                                .collect();
798                            select.projection(projection);
799                        }
800                    }
801                };
802
803                Ok(())
804            }
805            LogicalPlan::SubqueryAlias(plan_alias) => {
806                let (plan, mut columns) =
807                    subquery_alias_inner_query_and_columns(plan_alias);
808                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
809                    plan,
810                    Some(plan_alias.alias.clone()),
811                    select.already_projected(),
812                )?;
813                // if the child plan is a TableScan with pushdown operations, we don't need to
814                // create an additional subquery for it
815                if !select.already_projected() && unparsed_table_scan.is_none() {
816                    select.projection(vec![ast::SelectItem::Wildcard(
817                        ast::WildcardAdditionalOptions::default(),
818                    )]);
819                }
820                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
821                if !columns.is_empty()
822                    && !self.dialect.supports_column_alias_in_table_alias()
823                {
824                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
825                    let rewritten_plan =
826                        match inject_column_aliases_into_subquery(plan, columns) {
827                            Ok(p) => p,
828                            Err(e) => {
829                                return internal_err!(
830                                    "Failed to transform SubqueryAlias plan: {e}"
831                                )
832                            }
833                        };
834
835                    columns = vec![];
836
837                    self.select_to_sql_recursively(
838                        &rewritten_plan,
839                        query,
840                        select,
841                        relation,
842                    )?;
843                } else {
844                    self.select_to_sql_recursively(&plan, query, select, relation)?;
845                }
846
847                relation.alias(Some(
848                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
849                ));
850
851                Ok(())
852            }
853            LogicalPlan::Union(union) => {
854                // Covers cases where the UNION is a subquery and the projection is at the top level
855                if select.already_projected() {
856                    return self.derive_with_dialect_alias(
857                        "derived_union",
858                        plan,
859                        relation,
860                        false,
861                        vec![],
862                    );
863                }
864
865                let input_exprs: Vec<SetExpr> = union
866                    .inputs
867                    .iter()
868                    .map(|input| self.select_to_sql_expr(input, query))
869                    .collect::<Result<Vec<_>>>()?;
870
871                if input_exprs.len() < 2 {
872                    return internal_err!("UNION operator requires at least 2 inputs");
873                }
874
875                let set_quantifier =
876                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
877                        // Setting the SetQuantifier to None will unparse as a `UNION`
878                        // rather than a `UNION ALL`.
879                        ast::SetQuantifier::None
880                    } else {
881                        ast::SetQuantifier::All
882                    };
883
884                // Build the union expression tree bottom-up by reversing the order
885                // note that we are also swapping left and right inputs because of the rev
886                let union_expr = input_exprs
887                    .into_iter()
888                    .rev()
889                    .reduce(|a, b| SetExpr::SetOperation {
890                        op: ast::SetOperator::Union,
891                        set_quantifier,
892                        left: Box::new(b),
893                        right: Box::new(a),
894                    })
895                    .unwrap();
896
897                let Some(query) = query.as_mut() else {
898                    return internal_err!(
899                        "UNION ALL operator only valid in a statement context"
900                    );
901                };
902                query.body(Box::new(union_expr));
903
904                Ok(())
905            }
906            LogicalPlan::Window(window) => {
907                // Window nodes are handled simultaneously with Projection nodes
908                self.select_to_sql_recursively(
909                    window.input.as_ref(),
910                    query,
911                    select,
912                    relation,
913                )
914            }
915            LogicalPlan::EmptyRelation(_) => {
916                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
917                // a TableRelationBuilder will be created for the UNNEST node first.
918                if !relation.has_relation() {
919                    relation.empty();
920                }
921                Ok(())
922            }
923            LogicalPlan::Extension(extension) => {
924                if let Some(query) = query.as_mut() {
925                    self.extension_to_sql(
926                        extension.node.as_ref(),
927                        &mut Some(query),
928                        &mut Some(select),
929                        &mut Some(relation),
930                    )
931                } else {
932                    self.extension_to_sql(
933                        extension.node.as_ref(),
934                        &mut None,
935                        &mut Some(select),
936                        &mut Some(relation),
937                    )
938                }
939            }
940            LogicalPlan::Unnest(unnest) => {
941                if !unnest.struct_type_columns.is_empty() {
942                    return internal_err!(
943                        "Struct type columns are not currently supported in UNNEST: {:?}",
944                        unnest.struct_type_columns
945                    );
946                }
947
948                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
949                // Otherwise, there will be a duplicate SELECT clause.
950                // | Projection: table.col1, UNNEST(table.col2)
951                // |   Unnest: UNNEST(table.col2)
952                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
953                // |       Filter: table.col3 = Int64(3)
954                // |         TableScan: table projection=None
955                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
956                    // continue with projection input
957                    self.select_to_sql_recursively(&p.input, query, select, relation)
958                } else {
959                    internal_err!("Unnest input is not a Projection: {unnest:?}")
960                }
961            }
962            LogicalPlan::Subquery(subquery)
963                if find_unnest_node_until_relation(subquery.subquery.as_ref())
964                    .is_some() =>
965            {
966                if self.dialect.unnest_as_table_factor() {
967                    self.select_to_sql_recursively(
968                        subquery.subquery.as_ref(),
969                        query,
970                        select,
971                        relation,
972                    )
973                } else {
974                    self.derive_with_dialect_alias(
975                        "derived_unnest",
976                        subquery.subquery.as_ref(),
977                        relation,
978                        true,
979                        vec![],
980                    )
981                }
982            }
983            _ => {
984                not_impl_err!("Unsupported operator: {plan:?}")
985            }
986        }
987    }
988
989    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
990    ///
991    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
992    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
993    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
994    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
995    /// - If the column is not a placeholder column, return [None].
996    ///
997    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
998    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
999        if let Expr::Alias(Alias { expr, .. }) = expr {
1000            if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1001                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1002                    if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1003                        return Some(UnnestInputType::OuterReference);
1004                    }
1005                    return Some(UnnestInputType::Scalar);
1006                }
1007            }
1008        }
1009        None
1010    }
1011
1012    fn try_unnest_to_table_factor_sql(
1013        &self,
1014        unnest: &Unnest,
1015    ) -> Result<Option<UnnestRelationBuilder>> {
1016        let mut unnest_relation = UnnestRelationBuilder::default();
1017        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1018            return Ok(None);
1019        };
1020
1021        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1022            // It may be possible that UNNEST is used as a source for the query.
1023            // However, at this point, we don't yet know if it is just a single expression
1024            // from another source or if it's from UNNEST.
1025            //
1026            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1027            // which is normally safe to unnest as a table factor.
1028            // However, in the future, more comprehensive checks can be added here.
1029            return Ok(None);
1030        };
1031
1032        let exprs = projection
1033            .expr
1034            .iter()
1035            .map(|e| self.expr_to_sql(e))
1036            .collect::<Result<Vec<_>>>()?;
1037        unnest_relation.array_exprs(exprs);
1038
1039        Ok(Some(unnest_relation))
1040    }
1041
1042    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1043        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1044    }
1045
1046    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1047    /// If the table scan is without any pushdown operations, return None.
1048    fn unparse_table_scan_pushdown(
1049        plan: &LogicalPlan,
1050        alias: Option<TableReference>,
1051        already_projected: bool,
1052    ) -> Result<Option<LogicalPlan>> {
1053        match plan {
1054            LogicalPlan::TableScan(table_scan) => {
1055                if !Self::is_scan_with_pushdown(table_scan) {
1056                    return Ok(None);
1057                }
1058                let table_schema = table_scan.source.schema();
1059                let mut filter_alias_rewriter =
1060                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1061                        table_schema: &table_schema,
1062                        alias_name: alias_name.clone(),
1063                    });
1064
1065                let mut builder = LogicalPlanBuilder::scan(
1066                    table_scan.table_name.clone(),
1067                    Arc::clone(&table_scan.source),
1068                    None,
1069                )?;
1070                // We will rebase the column references to the new alias if it exists.
1071                // If the projection or filters are empty, we will append alias to the table scan.
1072                //
1073                // Example:
1074                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1075                if let Some(ref alias) = alias {
1076                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1077                        builder = builder.alias(alias.clone())?;
1078                    }
1079                }
1080
1081                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1082                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1083                // information included in the TableScan node.
1084                if !already_projected {
1085                    if let Some(project_vec) = &table_scan.projection {
1086                        if project_vec.is_empty() {
1087                            builder = builder.project(vec![Expr::Literal(
1088                                ScalarValue::Int64(Some(1)),
1089                                None,
1090                            )])?;
1091                        } else {
1092                            let project_columns = project_vec
1093                                .iter()
1094                                .cloned()
1095                                .map(|i| {
1096                                    let schema = table_scan.source.schema();
1097                                    let field = schema.field(i);
1098                                    if alias.is_some() {
1099                                        Column::new(alias.clone(), field.name().clone())
1100                                    } else {
1101                                        Column::new(
1102                                            Some(table_scan.table_name.clone()),
1103                                            field.name().clone(),
1104                                        )
1105                                    }
1106                                })
1107                                .collect::<Vec<_>>();
1108                            builder = builder.project(project_columns)?;
1109                        };
1110                    }
1111                }
1112
1113                let filter_expr: Result<Option<Expr>> = table_scan
1114                    .filters
1115                    .iter()
1116                    .cloned()
1117                    .map(|expr| {
1118                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1119                            expr.rewrite(rewriter).data()
1120                        } else {
1121                            Ok(expr)
1122                        }
1123                    })
1124                    .reduce(|acc, expr_result| {
1125                        acc.and_then(|acc_expr| {
1126                            expr_result.map(|expr| acc_expr.and(expr))
1127                        })
1128                    })
1129                    .transpose();
1130
1131                if let Some(filter) = filter_expr? {
1132                    builder = builder.filter(filter)?;
1133                }
1134
1135                if let Some(fetch) = table_scan.fetch {
1136                    builder = builder.limit(0, Some(fetch))?;
1137                }
1138
1139                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1140                // So we will append the alias to this subquery.
1141                // Example:
1142                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1143                if let Some(alias) = alias {
1144                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1145                        builder = builder.alias(alias)?;
1146                    }
1147                }
1148
1149                Ok(Some(builder.build()?))
1150            }
1151            LogicalPlan::SubqueryAlias(subquery_alias) => {
1152                let ret = Self::unparse_table_scan_pushdown(
1153                    &subquery_alias.input,
1154                    Some(subquery_alias.alias.clone()),
1155                    already_projected,
1156                )?;
1157                if let Some(alias) = alias {
1158                    if let Some(plan) = ret {
1159                        let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1160                        return Ok(Some(plan));
1161                    }
1162                }
1163                Ok(ret)
1164            }
1165            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1166            // The inner table scan could be a scan with pushdown operations.
1167            LogicalPlan::Projection(projection) => {
1168                if let Some(plan) = Self::unparse_table_scan_pushdown(
1169                    &projection.input,
1170                    alias.clone(),
1171                    already_projected,
1172                )? {
1173                    let exprs = if alias.is_some() {
1174                        let mut alias_rewriter =
1175                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1176                                table_schema: plan.schema().as_arrow(),
1177                                alias_name: alias_name.clone(),
1178                            });
1179                        projection
1180                            .expr
1181                            .iter()
1182                            .cloned()
1183                            .map(|expr| {
1184                                if let Some(ref mut rewriter) = alias_rewriter {
1185                                    expr.rewrite(rewriter).data()
1186                                } else {
1187                                    Ok(expr)
1188                                }
1189                            })
1190                            .collect::<Result<Vec<_>>>()?
1191                    } else {
1192                        projection.expr.clone()
1193                    };
1194                    Ok(Some(
1195                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1196                    ))
1197                } else {
1198                    Ok(None)
1199                }
1200            }
1201            _ => Ok(None),
1202        }
1203    }
1204
1205    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1206        match expr {
1207            Expr::Alias(Alias { expr, name, .. }) => {
1208                let inner = self.expr_to_sql(expr)?;
1209
1210                // Determine the alias name to use
1211                let col_name = if let Some(rewritten_name) =
1212                    self.dialect.col_alias_overrides(name)?
1213                {
1214                    rewritten_name.to_string()
1215                } else {
1216                    name.to_string()
1217                };
1218
1219                Ok(ast::SelectItem::ExprWithAlias {
1220                    expr: inner,
1221                    alias: self.new_ident_quoted_if_needs(col_name),
1222                })
1223            }
1224            _ => {
1225                let inner = self.expr_to_sql(expr)?;
1226
1227                Ok(ast::SelectItem::UnnamedExpr(inner))
1228            }
1229        }
1230    }
1231
1232    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1233        Ok(OrderByKind::Expressions(
1234            sort_exprs
1235                .iter()
1236                .map(|sort_expr| self.sort_to_sql(sort_expr))
1237                .collect::<Result<Vec<_>>>()?,
1238        ))
1239    }
1240
1241    fn join_operator_to_sql(
1242        &self,
1243        join_type: JoinType,
1244        constraint: ast::JoinConstraint,
1245    ) -> Result<ast::JoinOperator> {
1246        Ok(match join_type {
1247            JoinType::Inner => match &constraint {
1248                ast::JoinConstraint::On(_)
1249                | ast::JoinConstraint::Using(_)
1250                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1251                ast::JoinConstraint::None => {
1252                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1253                    // return a CROSS JOIN instead
1254                    ast::JoinOperator::CrossJoin
1255                }
1256            },
1257            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1258            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1259            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1260            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1261            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1262            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1263            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1264            JoinType::LeftMark | JoinType::RightMark => {
1265                unimplemented!("Unparsing of Mark join type")
1266            }
1267        })
1268    }
1269
1270    /// Convert the components of a USING clause to the USING AST. Returns
1271    /// 'None' if the conditions are not compatible with a USING expression,
1272    /// e.g. non-column expressions or non-matching names.
1273    fn join_using_to_sql(
1274        &self,
1275        join_conditions: &[(Expr, Expr)],
1276    ) -> Option<ast::JoinConstraint> {
1277        let mut object_names = Vec::with_capacity(join_conditions.len());
1278        for (left, right) in join_conditions {
1279            match (left, right) {
1280                (
1281                    Expr::Column(Column {
1282                        relation: _,
1283                        name: left_name,
1284                        spans: _,
1285                    }),
1286                    Expr::Column(Column {
1287                        relation: _,
1288                        name: right_name,
1289                        spans: _,
1290                    }),
1291                ) if left_name == right_name => {
1292                    // For example, if the join condition `t1.id = t2.id`
1293                    // this is represented as two columns like `[t1.id, t2.id]`
1294                    // This code forms `id` (without relation name)
1295                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1296                    object_names.push(ast::ObjectName::from(vec![ident]));
1297                }
1298                // USING is only valid with matching column names; arbitrary expressions
1299                // are not allowed
1300                _ => return None,
1301            }
1302        }
1303        Some(ast::JoinConstraint::Using(object_names))
1304    }
1305
1306    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1307    fn join_constraint_to_sql(
1308        &self,
1309        constraint: JoinConstraint,
1310        conditions: &[(Expr, Expr)],
1311        filter: Option<&Expr>,
1312    ) -> Result<ast::JoinConstraint> {
1313        match (constraint, conditions, filter) {
1314            // No constraints
1315            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1316                Ok(ast::JoinConstraint::None)
1317            }
1318
1319            (JoinConstraint::Using, conditions, None) => {
1320                match self.join_using_to_sql(conditions) {
1321                    Some(using) => Ok(using),
1322                    // As above, this should not be reachable from parsed SQL,
1323                    // but a user could create this; we "downgrade" to ON.
1324                    None => self.join_conditions_to_sql_on(conditions, None),
1325                }
1326            }
1327
1328            // Two cases here:
1329            // 1. Straightforward ON case, with possible equi-join conditions
1330            //    and additional filters
1331            // 2. USING with additional filters; we "downgrade" to ON, because
1332            //    you can't use USING with arbitrary filters. (This should not
1333            //    be accessible from parsed SQL, but may have been a
1334            //    custom-built JOIN by a user.)
1335            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1336                self.join_conditions_to_sql_on(conditions, filter)
1337            }
1338        }
1339    }
1340
1341    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1342    // AST node, with the equi-join conditions and the filter merged into a
1343    // single conditional expression
1344    fn join_conditions_to_sql_on(
1345        &self,
1346        join_conditions: &[(Expr, Expr)],
1347        filter: Option<&Expr>,
1348    ) -> Result<ast::JoinConstraint> {
1349        let mut condition = None;
1350        // AND the join conditions together to create the overall condition
1351        for (left, right) in join_conditions {
1352            // Parse left and right
1353            let l = self.expr_to_sql(left)?;
1354            let r = self.expr_to_sql(right)?;
1355            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1356            condition = match condition {
1357                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1358                None => Some(e),
1359            };
1360        }
1361
1362        // Then AND the non-equijoin filter condition as well
1363        condition = match (condition, filter) {
1364            (Some(expr), Some(filter)) => {
1365                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1366            }
1367            (Some(expr), None) => Some(expr),
1368            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1369            (None, None) => None,
1370        };
1371
1372        let constraint = match condition {
1373            Some(filter) => ast::JoinConstraint::On(filter),
1374            None => ast::JoinConstraint::None,
1375        };
1376
1377        Ok(constraint)
1378    }
1379
1380    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1381        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1382    }
1383
1384    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1385        let columns = columns
1386            .into_iter()
1387            .map(|ident| TableAliasColumnDef {
1388                name: ident,
1389                data_type: None,
1390            })
1391            .collect();
1392        ast::TableAlias {
1393            name: self.new_ident_quoted_if_needs(alias),
1394            columns,
1395        }
1396    }
1397
1398    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1399        not_impl_err!("Unsupported plan: {plan:?}")
1400    }
1401}
1402
1403impl From<BuilderError> for DataFusionError {
1404    fn from(e: BuilderError) -> Self {
1405        DataFusionError::External(Box::new(e))
1406    }
1407}
1408
1409/// The type of the input to the UNNEST table factor.
1410#[derive(Debug)]
1411enum UnnestInputType {
1412    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1413    OuterReference,
1414    /// The input is a scalar value. It will be presented like a scalar array or struct.
1415    Scalar,
1416}