datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    ast::{
20        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22    },
23    rewrite::{
24        inject_column_aliases_into_subquery, normalize_union_schema,
25        rewrite_plan_for_sort_on_non_projected_fields,
26        subquery_alias_inner_query_and_columns, TableAliasRewriter,
27    },
28    utils::{
29        find_agg_node_within_select, find_unnest_node_within_select,
30        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32    },
33    Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37    UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    internal_err, not_impl_err,
43    tree_node::{TransformedResult, TreeNode},
44    Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// let sql = plan_to_sql(&plan).unwrap(); // convert to AST
85/// // use the Display impl to convert to SQL text
86/// assert_eq!(sql.to_string(), "SELECT \"table\".id, \"table\".\"value\" FROM \"table\"")
87/// ```
88///
89/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
90/// [`expr_to_sql`]: crate::unparser::expr_to_sql
91pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92    let unparser = Unparser::default();
93    unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98        let plan = normalize_union_schema(plan)?;
99
100        match plan {
101            LogicalPlan::Projection(_)
102            | LogicalPlan::Filter(_)
103            | LogicalPlan::Window(_)
104            | LogicalPlan::Aggregate(_)
105            | LogicalPlan::Sort(_)
106            | LogicalPlan::Join(_)
107            | LogicalPlan::Repartition(_)
108            | LogicalPlan::Union(_)
109            | LogicalPlan::TableScan(_)
110            | LogicalPlan::EmptyRelation(_)
111            | LogicalPlan::Subquery(_)
112            | LogicalPlan::SubqueryAlias(_)
113            | LogicalPlan::Limit(_)
114            | LogicalPlan::Statement(_)
115            | LogicalPlan::Values(_)
116            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118            LogicalPlan::Extension(extension) => {
119                self.extension_to_statement(extension.node.as_ref())
120            }
121            LogicalPlan::Explain(_)
122            | LogicalPlan::Analyze(_)
123            | LogicalPlan::Ddl(_)
124            | LogicalPlan::Copy(_)
125            | LogicalPlan::DescribeTable(_)
126            | LogicalPlan::RecursiveQuery(_)
127            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128        }
129    }
130
131    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
132    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
133    /// the first unparsing result will be returned.
134    fn extension_to_statement(
135        &self,
136        node: &dyn UserDefinedLogicalNode,
137    ) -> Result<ast::Statement> {
138        let mut statement = None;
139        for unparser in &self.extension_unparsers {
140            match unparser.unparse_to_statement(node, self)? {
141                UnparseToStatementResult::Modified(stmt) => {
142                    statement = Some(stmt);
143                    break;
144                }
145                UnparseToStatementResult::Unmodified => {}
146            }
147        }
148        if let Some(statement) = statement {
149            Ok(statement)
150        } else {
151            not_impl_err!("Unsupported extension node: {node:?}")
152        }
153    }
154
155    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
156    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
157    /// the first unparser supporting the node will be used.
158    fn extension_to_sql(
159        &self,
160        node: &dyn UserDefinedLogicalNode,
161        query: &mut Option<&mut QueryBuilder>,
162        select: &mut Option<&mut SelectBuilder>,
163        relation: &mut Option<&mut RelationBuilder>,
164    ) -> Result<()> {
165        for unparser in &self.extension_unparsers {
166            match unparser.unparse(node, self, query, select, relation)? {
167                UnparseWithinStatementResult::Modified => return Ok(()),
168                UnparseWithinStatementResult::Unmodified => {}
169            }
170        }
171        not_impl_err!("Unsupported extension node: {node:?}")
172    }
173
174    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175        let mut query_builder = Some(QueryBuilder::default());
176
177        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179        let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181        Ok(ast::Statement::Query(Box::new(query)))
182    }
183
184    fn select_to_sql_expr(
185        &self,
186        plan: &LogicalPlan,
187        query: &mut Option<QueryBuilder>,
188    ) -> Result<SetExpr> {
189        let mut select_builder = SelectBuilder::default();
190        select_builder.push_from(TableWithJoinsBuilder::default());
191        let mut relation_builder = RelationBuilder::default();
192        self.select_to_sql_recursively(
193            plan,
194            query,
195            &mut select_builder,
196            &mut relation_builder,
197        )?;
198
199        // If we were able to construct a full body (i.e. UNION ALL), return it
200        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201            return Ok(*body);
202        }
203
204        // If no projection is set, add a wildcard projection to the select
205        // which will be translated to `SELECT *` in the SQL statement
206        if !select_builder.already_projected() {
207            select_builder.projection(vec![ast::SelectItem::Wildcard(
208                ast::WildcardAdditionalOptions::default(),
209            )]);
210        }
211
212        let mut twj = select_builder.pop_from().unwrap();
213        twj.relation(relation_builder);
214        select_builder.push_from(twj);
215
216        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217    }
218
219    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
220    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
221    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
222    fn reconstruct_select_statement(
223        &self,
224        plan: &LogicalPlan,
225        p: &Projection,
226        select: &mut SelectBuilder,
227    ) -> Result<()> {
228        let mut exprs = p.expr.clone();
229
230        // If an Unnest node is found within the select, find and unproject the unnest column
231        if let Some(unnest) = find_unnest_node_within_select(plan) {
232            exprs = exprs
233                .into_iter()
234                .map(|e| unproject_unnest_expr(e, unnest))
235                .collect::<Result<Vec<_>>>()?;
236        };
237
238        match (
239            find_agg_node_within_select(plan, true),
240            find_window_nodes_within_select(plan, None, true),
241        ) {
242            (Some(agg), window) => {
243                let window_option = window.as_deref();
244                let items = exprs
245                    .into_iter()
246                    .map(|proj_expr| {
247                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248                        self.select_item_to_sql(&unproj)
249                    })
250                    .collect::<Result<Vec<_>>>()?;
251
252                select.projection(items);
253                select.group_by(ast::GroupByExpr::Expressions(
254                    agg.group_expr
255                        .iter()
256                        .map(|expr| self.expr_to_sql(expr))
257                        .collect::<Result<Vec<_>>>()?,
258                    vec![],
259                ));
260            }
261            (None, Some(window)) => {
262                let items = exprs
263                    .into_iter()
264                    .map(|proj_expr| {
265                        let unproj = unproject_window_exprs(proj_expr, &window)?;
266                        self.select_item_to_sql(&unproj)
267                    })
268                    .collect::<Result<Vec<_>>>()?;
269
270                select.projection(items);
271            }
272            _ => {
273                let items = exprs
274                    .iter()
275                    .map(|e| self.select_item_to_sql(e))
276                    .collect::<Result<Vec<_>>>()?;
277                select.projection(items);
278            }
279        }
280        Ok(())
281    }
282
283    fn derive(
284        &self,
285        plan: &LogicalPlan,
286        relation: &mut RelationBuilder,
287        alias: Option<ast::TableAlias>,
288        lateral: bool,
289    ) -> Result<()> {
290        let mut derived_builder = DerivedRelationBuilder::default();
291        derived_builder.lateral(lateral).alias(alias).subquery({
292            let inner_statement = self.plan_to_sql(plan)?;
293            if let ast::Statement::Query(inner_query) = inner_statement {
294                inner_query
295            } else {
296                return internal_err!(
297                    "Subquery must be a Query, but found {inner_statement:?}"
298                );
299            }
300        });
301        relation.derived(derived_builder);
302
303        Ok(())
304    }
305
306    fn derive_with_dialect_alias(
307        &self,
308        alias: &str,
309        plan: &LogicalPlan,
310        relation: &mut RelationBuilder,
311        lateral: bool,
312        columns: Vec<Ident>,
313    ) -> Result<()> {
314        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
315            self.derive(
316                plan,
317                relation,
318                Some(self.new_table_alias(alias.to_string(), columns)),
319                lateral,
320            )
321        } else {
322            self.derive(plan, relation, None, lateral)
323        }
324    }
325
326    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
327    fn select_to_sql_recursively(
328        &self,
329        plan: &LogicalPlan,
330        query: &mut Option<QueryBuilder>,
331        select: &mut SelectBuilder,
332        relation: &mut RelationBuilder,
333    ) -> Result<()> {
334        match plan {
335            LogicalPlan::TableScan(scan) => {
336                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
337                    plan,
338                    None,
339                    select.already_projected(),
340                )? {
341                    return self.select_to_sql_recursively(
342                        &unparsed_table_scan,
343                        query,
344                        select,
345                        relation,
346                    );
347                }
348                let mut builder = TableRelationBuilder::default();
349                let mut table_parts = vec![];
350                if let Some(catalog_name) = scan.table_name.catalog() {
351                    table_parts
352                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
353                }
354                if let Some(schema_name) = scan.table_name.schema() {
355                    table_parts
356                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
357                }
358                table_parts.push(
359                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
360                );
361                builder.name(ast::ObjectName::from(table_parts));
362                relation.table(builder);
363
364                Ok(())
365            }
366            LogicalPlan::Projection(p) => {
367                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
368                    return self
369                        .select_to_sql_recursively(&new_plan, query, select, relation);
370                }
371
372                // Projection can be top-level plan for unnest relation
373                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
374                // only one expression, which is the placeholder column generated by the rewriter.
375                let unnest_input_type = if p.expr.len() == 1 {
376                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
377                } else {
378                    None
379                };
380                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
381                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
382                        if let Some(unnest_relation) =
383                            self.try_unnest_to_table_factor_sql(unnest)?
384                        {
385                            relation.unnest(unnest_relation);
386                            return self.select_to_sql_recursively(
387                                p.input.as_ref(),
388                                query,
389                                select,
390                                relation,
391                            );
392                        }
393                    }
394                }
395
396                // If it's a unnest projection, we should provide the table column alias
397                // to provide a column name for the unnest relation.
398                let columns = if unnest_input_type.is_some() {
399                    p.expr
400                        .iter()
401                        .map(|e| {
402                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
403                        })
404                        .collect()
405                } else {
406                    vec![]
407                };
408                // Projection can be top-level plan for derived table
409                if select.already_projected() {
410                    return self.derive_with_dialect_alias(
411                        "derived_projection",
412                        plan,
413                        relation,
414                        unnest_input_type
415                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
416                            .is_some(),
417                        columns,
418                    );
419                }
420                self.reconstruct_select_statement(plan, p, select)?;
421                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
422            }
423            LogicalPlan::Filter(filter) => {
424                if let Some(agg) =
425                    find_agg_node_within_select(plan, select.already_projected())
426                {
427                    let unprojected =
428                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
429                    let filter_expr = self.expr_to_sql(&unprojected)?;
430                    select.having(Some(filter_expr));
431                } else {
432                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
433                    select.selection(Some(filter_expr));
434                }
435
436                self.select_to_sql_recursively(
437                    filter.input.as_ref(),
438                    query,
439                    select,
440                    relation,
441                )
442            }
443            LogicalPlan::Limit(limit) => {
444                // Limit can be top-level plan for derived table
445                if select.already_projected() {
446                    return self.derive_with_dialect_alias(
447                        "derived_limit",
448                        plan,
449                        relation,
450                        false,
451                        vec![],
452                    );
453                }
454                if let Some(fetch) = &limit.fetch {
455                    let Some(query) = query.as_mut() else {
456                        return internal_err!(
457                            "Limit operator only valid in a statement context."
458                        );
459                    };
460                    query.limit(Some(self.expr_to_sql(fetch)?));
461                }
462
463                if let Some(skip) = &limit.skip {
464                    let Some(query) = query.as_mut() else {
465                        return internal_err!(
466                            "Offset operator only valid in a statement context."
467                        );
468                    };
469                    query.offset(Some(ast::Offset {
470                        rows: ast::OffsetRows::None,
471                        value: self.expr_to_sql(skip)?,
472                    }));
473                }
474
475                self.select_to_sql_recursively(
476                    limit.input.as_ref(),
477                    query,
478                    select,
479                    relation,
480                )
481            }
482            LogicalPlan::Sort(sort) => {
483                // Sort can be top-level plan for derived table
484                if select.already_projected() {
485                    return self.derive_with_dialect_alias(
486                        "derived_sort",
487                        plan,
488                        relation,
489                        false,
490                        vec![],
491                    );
492                }
493                let Some(query_ref) = query else {
494                    return internal_err!(
495                        "Sort operator only valid in a statement context."
496                    );
497                };
498
499                if let Some(fetch) = sort.fetch {
500                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
501                        fetch.to_string(),
502                        false,
503                    ))));
504                };
505
506                let agg = find_agg_node_within_select(plan, select.already_projected());
507                // unproject sort expressions
508                let sort_exprs: Vec<SortExpr> = sort
509                    .expr
510                    .iter()
511                    .map(|sort_expr| {
512                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
513                    })
514                    .collect::<Result<Vec<_>>>()?;
515
516                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
517
518                self.select_to_sql_recursively(
519                    sort.input.as_ref(),
520                    query,
521                    select,
522                    relation,
523                )
524            }
525            LogicalPlan::Aggregate(agg) => {
526                // Aggregation can be already handled in the projection case
527                if !select.already_projected() {
528                    // The query returns aggregate and group expressions. If that weren't the case,
529                    // the aggregate would have been placed inside a projection, making the check above^ false
530                    let exprs: Vec<_> = agg
531                        .aggr_expr
532                        .iter()
533                        .chain(agg.group_expr.iter())
534                        .map(|expr| self.select_item_to_sql(expr))
535                        .collect::<Result<Vec<_>>>()?;
536                    select.projection(exprs);
537
538                    select.group_by(ast::GroupByExpr::Expressions(
539                        agg.group_expr
540                            .iter()
541                            .map(|expr| self.expr_to_sql(expr))
542                            .collect::<Result<Vec<_>>>()?,
543                        vec![],
544                    ));
545                }
546
547                self.select_to_sql_recursively(
548                    agg.input.as_ref(),
549                    query,
550                    select,
551                    relation,
552                )
553            }
554            LogicalPlan::Distinct(distinct) => {
555                // Distinct can be top-level plan for derived table
556                if select.already_projected() {
557                    return self.derive_with_dialect_alias(
558                        "derived_distinct",
559                        plan,
560                        relation,
561                        false,
562                        vec![],
563                    );
564                }
565
566                // If this distinct is the parent of a Union and we're in a query context,
567                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
568                if let Distinct::All(input) = distinct {
569                    if matches!(input.as_ref(), LogicalPlan::Union(_)) {
570                        if let Some(query_mut) = query.as_mut() {
571                            query_mut.distinct_union();
572                            return self.select_to_sql_recursively(
573                                input.as_ref(),
574                                query,
575                                select,
576                                relation,
577                            );
578                        }
579                    }
580                }
581
582                let (select_distinct, input) = match distinct {
583                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
584                    Distinct::On(on) => {
585                        let exprs = on
586                            .on_expr
587                            .iter()
588                            .map(|e| self.expr_to_sql(e))
589                            .collect::<Result<Vec<_>>>()?;
590                        let items = on
591                            .select_expr
592                            .iter()
593                            .map(|e| self.select_item_to_sql(e))
594                            .collect::<Result<Vec<_>>>()?;
595                        if let Some(sort_expr) = &on.sort_expr {
596                            if let Some(query_ref) = query {
597                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
598                            } else {
599                                return internal_err!(
600                                    "Sort operator only valid in a statement context."
601                                );
602                            }
603                        }
604                        select.projection(items);
605                        (ast::Distinct::On(exprs), on.input.as_ref())
606                    }
607                };
608                select.distinct(Some(select_distinct));
609                self.select_to_sql_recursively(input, query, select, relation)
610            }
611            LogicalPlan::Join(join) => {
612                let mut table_scan_filters = vec![];
613                let (left_plan, right_plan) = match join.join_type {
614                    JoinType::RightSemi | JoinType::RightAnti => {
615                        (&join.right, &join.left)
616                    }
617                    _ => (&join.left, &join.right),
618                };
619                // If there's an outer projection plan, it will already set up the projection.
620                // In that case, we don't need to worry about setting up the projection here.
621                // The outer projection plan will handle projecting the correct columns.
622                let already_projected = select.already_projected();
623
624                let left_plan =
625                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
626                        Some((plan, filters)) => {
627                            table_scan_filters.extend(filters);
628                            Arc::new(plan)
629                        }
630                        None => Arc::clone(left_plan),
631                    };
632
633                self.select_to_sql_recursively(
634                    left_plan.as_ref(),
635                    query,
636                    select,
637                    relation,
638                )?;
639
640                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
641                {
642                    Some(select.pop_projections())
643                } else {
644                    None
645                };
646
647                let right_plan =
648                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
649                        Some((plan, filters)) => {
650                            table_scan_filters.extend(filters);
651                            Arc::new(plan)
652                        }
653                        None => Arc::clone(right_plan),
654                    };
655
656                let mut right_relation = RelationBuilder::default();
657
658                self.select_to_sql_recursively(
659                    right_plan.as_ref(),
660                    query,
661                    select,
662                    &mut right_relation,
663                )?;
664
665                let join_filters = if table_scan_filters.is_empty() {
666                    join.filter.clone()
667                } else {
668                    // Combine `table_scan_filters` into a single filter using `AND`
669                    let Some(combined_filters) =
670                        table_scan_filters.into_iter().reduce(|acc, filter| {
671                            Expr::BinaryExpr(BinaryExpr {
672                                left: Box::new(acc),
673                                op: Operator::And,
674                                right: Box::new(filter),
675                            })
676                        })
677                    else {
678                        return internal_err!("Failed to combine TableScan filters");
679                    };
680
681                    // Combine `join.filter` with `combined_filters` using `AND`
682                    match &join.filter {
683                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
684                            left: Box::new(filter.clone()),
685                            op: Operator::And,
686                            right: Box::new(combined_filters),
687                        })),
688                        None => Some(combined_filters),
689                    }
690                };
691
692                let join_constraint = self.join_constraint_to_sql(
693                    join.join_constraint,
694                    &join.on,
695                    join_filters.as_ref(),
696                )?;
697
698                self.select_to_sql_recursively(
699                    right_plan.as_ref(),
700                    query,
701                    select,
702                    &mut right_relation,
703                )?;
704
705                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
706                {
707                    Some(select.pop_projections())
708                } else {
709                    None
710                };
711
712                match join.join_type {
713                    JoinType::LeftSemi
714                    | JoinType::LeftAnti
715                    | JoinType::LeftMark
716                    | JoinType::RightSemi
717                    | JoinType::RightAnti
718                    | JoinType::RightMark => {
719                        let mut query_builder = QueryBuilder::default();
720                        let mut from = TableWithJoinsBuilder::default();
721                        let mut exists_select: SelectBuilder = SelectBuilder::default();
722                        from.relation(right_relation);
723                        exists_select.push_from(from);
724                        if let Some(filter) = &join.filter {
725                            exists_select.selection(Some(self.expr_to_sql(filter)?));
726                        }
727                        for (left, right) in &join.on {
728                            exists_select.selection(Some(
729                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
730                            ));
731                        }
732                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
733                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
734                        )]);
735                        query_builder.body(Box::new(SetExpr::Select(Box::new(
736                            exists_select.build()?,
737                        ))));
738
739                        let negated = match join.join_type {
740                            JoinType::LeftSemi
741                            | JoinType::RightSemi
742                            | JoinType::LeftMark
743                            | JoinType::RightMark => false,
744                            JoinType::LeftAnti | JoinType::RightAnti => true,
745                            _ => unreachable!(),
746                        };
747                        let exists_expr = ast::Expr::Exists {
748                            subquery: Box::new(query_builder.build()?),
749                            negated,
750                        };
751
752                        match join.join_type {
753                            JoinType::LeftMark | JoinType::RightMark => {
754                                let source_schema =
755                                    if join.join_type == JoinType::LeftMark {
756                                        right_plan.schema()
757                                    } else {
758                                        left_plan.schema()
759                                    };
760                                let (table_ref, _) = source_schema.qualified_field(0);
761                                let column = self.col_to_sql(&Column::new(
762                                    table_ref.cloned(),
763                                    "mark",
764                                ))?;
765                                select.replace_mark(&column, &exists_expr);
766                            }
767                            _ => {
768                                select.selection(Some(exists_expr));
769                            }
770                        }
771                        if let Some(projection) = left_projection {
772                            select.projection(projection);
773                        }
774                    }
775                    JoinType::Inner
776                    | JoinType::Left
777                    | JoinType::Right
778                    | JoinType::Full => {
779                        let Ok(Some(relation)) = right_relation.build() else {
780                            return internal_err!("Failed to build right relation");
781                        };
782                        let ast_join = ast::Join {
783                            relation,
784                            global: false,
785                            join_operator: self
786                                .join_operator_to_sql(join.join_type, join_constraint)?,
787                        };
788                        let mut from = select.pop_from().unwrap();
789                        from.push_join(ast_join);
790                        select.push_from(from);
791                        if !already_projected {
792                            let Some(left_projection) = left_projection else {
793                                return internal_err!("Left projection is missing");
794                            };
795
796                            let Some(right_projection) = right_projection else {
797                                return internal_err!("Right projection is missing");
798                            };
799
800                            let projection = left_projection
801                                .into_iter()
802                                .chain(right_projection.into_iter())
803                                .collect();
804                            select.projection(projection);
805                        }
806                    }
807                };
808
809                Ok(())
810            }
811            LogicalPlan::SubqueryAlias(plan_alias) => {
812                let (plan, mut columns) =
813                    subquery_alias_inner_query_and_columns(plan_alias);
814                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
815                    plan,
816                    Some(plan_alias.alias.clone()),
817                    select.already_projected(),
818                )?;
819                // if the child plan is a TableScan with pushdown operations, we don't need to
820                // create an additional subquery for it
821                if !select.already_projected() && unparsed_table_scan.is_none() {
822                    select.projection(vec![ast::SelectItem::Wildcard(
823                        ast::WildcardAdditionalOptions::default(),
824                    )]);
825                }
826                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
827                if !columns.is_empty()
828                    && !self.dialect.supports_column_alias_in_table_alias()
829                {
830                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
831                    let rewritten_plan =
832                        match inject_column_aliases_into_subquery(plan, columns) {
833                            Ok(p) => p,
834                            Err(e) => {
835                                return internal_err!(
836                                    "Failed to transform SubqueryAlias plan: {e}"
837                                )
838                            }
839                        };
840
841                    columns = vec![];
842
843                    self.select_to_sql_recursively(
844                        &rewritten_plan,
845                        query,
846                        select,
847                        relation,
848                    )?;
849                } else {
850                    self.select_to_sql_recursively(&plan, query, select, relation)?;
851                }
852
853                relation.alias(Some(
854                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
855                ));
856
857                Ok(())
858            }
859            LogicalPlan::Union(union) => {
860                // Covers cases where the UNION is a subquery and the projection is at the top level
861                if select.already_projected() {
862                    return self.derive_with_dialect_alias(
863                        "derived_union",
864                        plan,
865                        relation,
866                        false,
867                        vec![],
868                    );
869                }
870
871                let input_exprs: Vec<SetExpr> = union
872                    .inputs
873                    .iter()
874                    .map(|input| self.select_to_sql_expr(input, query))
875                    .collect::<Result<Vec<_>>>()?;
876
877                if input_exprs.len() < 2 {
878                    return internal_err!("UNION operator requires at least 2 inputs");
879                }
880
881                let set_quantifier =
882                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
883                        // Setting the SetQuantifier to None will unparse as a `UNION`
884                        // rather than a `UNION ALL`.
885                        ast::SetQuantifier::None
886                    } else {
887                        ast::SetQuantifier::All
888                    };
889
890                // Build the union expression tree bottom-up by reversing the order
891                // note that we are also swapping left and right inputs because of the rev
892                let union_expr = input_exprs
893                    .into_iter()
894                    .rev()
895                    .reduce(|a, b| SetExpr::SetOperation {
896                        op: ast::SetOperator::Union,
897                        set_quantifier,
898                        left: Box::new(b),
899                        right: Box::new(a),
900                    })
901                    .unwrap();
902
903                let Some(query) = query.as_mut() else {
904                    return internal_err!(
905                        "UNION ALL operator only valid in a statement context"
906                    );
907                };
908                query.body(Box::new(union_expr));
909
910                Ok(())
911            }
912            LogicalPlan::Window(window) => {
913                // Window nodes are handled simultaneously with Projection nodes
914                self.select_to_sql_recursively(
915                    window.input.as_ref(),
916                    query,
917                    select,
918                    relation,
919                )
920            }
921            LogicalPlan::EmptyRelation(_) => {
922                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
923                // a TableRelationBuilder will be created for the UNNEST node first.
924                if !relation.has_relation() {
925                    relation.empty();
926                }
927                Ok(())
928            }
929            LogicalPlan::Extension(extension) => {
930                if let Some(query) = query.as_mut() {
931                    self.extension_to_sql(
932                        extension.node.as_ref(),
933                        &mut Some(query),
934                        &mut Some(select),
935                        &mut Some(relation),
936                    )
937                } else {
938                    self.extension_to_sql(
939                        extension.node.as_ref(),
940                        &mut None,
941                        &mut Some(select),
942                        &mut Some(relation),
943                    )
944                }
945            }
946            LogicalPlan::Unnest(unnest) => {
947                if !unnest.struct_type_columns.is_empty() {
948                    return internal_err!(
949                        "Struct type columns are not currently supported in UNNEST: {:?}",
950                        unnest.struct_type_columns
951                    );
952                }
953
954                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
955                // Otherwise, there will be a duplicate SELECT clause.
956                // | Projection: table.col1, UNNEST(table.col2)
957                // |   Unnest: UNNEST(table.col2)
958                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
959                // |       Filter: table.col3 = Int64(3)
960                // |         TableScan: table projection=None
961                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
962                    // continue with projection input
963                    self.select_to_sql_recursively(&p.input, query, select, relation)
964                } else {
965                    internal_err!("Unnest input is not a Projection: {unnest:?}")
966                }
967            }
968            LogicalPlan::Subquery(subquery)
969                if find_unnest_node_until_relation(subquery.subquery.as_ref())
970                    .is_some() =>
971            {
972                if self.dialect.unnest_as_table_factor() {
973                    self.select_to_sql_recursively(
974                        subquery.subquery.as_ref(),
975                        query,
976                        select,
977                        relation,
978                    )
979                } else {
980                    self.derive_with_dialect_alias(
981                        "derived_unnest",
982                        subquery.subquery.as_ref(),
983                        relation,
984                        true,
985                        vec![],
986                    )
987                }
988            }
989            _ => {
990                not_impl_err!("Unsupported operator: {plan:?}")
991            }
992        }
993    }
994
995    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
996    ///
997    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
998    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
999    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
1000    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
1001    /// - If the column is not a placeholder column, return [None].
1002    ///
1003    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1004    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1005        if let Expr::Alias(Alias { expr, .. }) = expr {
1006            if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1007                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1008                    if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1009                        return Some(UnnestInputType::OuterReference);
1010                    }
1011                    return Some(UnnestInputType::Scalar);
1012                }
1013            }
1014        }
1015        None
1016    }
1017
1018    fn try_unnest_to_table_factor_sql(
1019        &self,
1020        unnest: &Unnest,
1021    ) -> Result<Option<UnnestRelationBuilder>> {
1022        let mut unnest_relation = UnnestRelationBuilder::default();
1023        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1024            return Ok(None);
1025        };
1026
1027        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1028            // It may be possible that UNNEST is used as a source for the query.
1029            // However, at this point, we don't yet know if it is just a single expression
1030            // from another source or if it's from UNNEST.
1031            //
1032            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1033            // which is normally safe to unnest as a table factor.
1034            // However, in the future, more comprehensive checks can be added here.
1035            return Ok(None);
1036        };
1037
1038        let exprs = projection
1039            .expr
1040            .iter()
1041            .map(|e| self.expr_to_sql(e))
1042            .collect::<Result<Vec<_>>>()?;
1043        unnest_relation.array_exprs(exprs);
1044
1045        Ok(Some(unnest_relation))
1046    }
1047
1048    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1049        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1050    }
1051
1052    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1053    /// If the table scan is without any pushdown operations, return None.
1054    fn unparse_table_scan_pushdown(
1055        plan: &LogicalPlan,
1056        alias: Option<TableReference>,
1057        already_projected: bool,
1058    ) -> Result<Option<LogicalPlan>> {
1059        match plan {
1060            LogicalPlan::TableScan(table_scan) => {
1061                if !Self::is_scan_with_pushdown(table_scan) {
1062                    return Ok(None);
1063                }
1064                let table_schema = table_scan.source.schema();
1065                let mut filter_alias_rewriter =
1066                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1067                        table_schema: &table_schema,
1068                        alias_name: alias_name.clone(),
1069                    });
1070
1071                let mut builder = LogicalPlanBuilder::scan(
1072                    table_scan.table_name.clone(),
1073                    Arc::clone(&table_scan.source),
1074                    None,
1075                )?;
1076                // We will rebase the column references to the new alias if it exists.
1077                // If the projection or filters are empty, we will append alias to the table scan.
1078                //
1079                // Example:
1080                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1081                if let Some(ref alias) = alias {
1082                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1083                        builder = builder.alias(alias.clone())?;
1084                    }
1085                }
1086
1087                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1088                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1089                // information included in the TableScan node.
1090                if !already_projected {
1091                    if let Some(project_vec) = &table_scan.projection {
1092                        if project_vec.is_empty() {
1093                            builder = builder.project(vec![Expr::Literal(
1094                                ScalarValue::Int64(Some(1)),
1095                                None,
1096                            )])?;
1097                        } else {
1098                            let project_columns = project_vec
1099                                .iter()
1100                                .cloned()
1101                                .map(|i| {
1102                                    let schema = table_scan.source.schema();
1103                                    let field = schema.field(i);
1104                                    if alias.is_some() {
1105                                        Column::new(alias.clone(), field.name().clone())
1106                                    } else {
1107                                        Column::new(
1108                                            Some(table_scan.table_name.clone()),
1109                                            field.name().clone(),
1110                                        )
1111                                    }
1112                                })
1113                                .collect::<Vec<_>>();
1114                            builder = builder.project(project_columns)?;
1115                        };
1116                    }
1117                }
1118
1119                let filter_expr: Result<Option<Expr>> = table_scan
1120                    .filters
1121                    .iter()
1122                    .cloned()
1123                    .map(|expr| {
1124                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1125                            expr.rewrite(rewriter).data()
1126                        } else {
1127                            Ok(expr)
1128                        }
1129                    })
1130                    .reduce(|acc, expr_result| {
1131                        acc.and_then(|acc_expr| {
1132                            expr_result.map(|expr| acc_expr.and(expr))
1133                        })
1134                    })
1135                    .transpose();
1136
1137                if let Some(filter) = filter_expr? {
1138                    builder = builder.filter(filter)?;
1139                }
1140
1141                if let Some(fetch) = table_scan.fetch {
1142                    builder = builder.limit(0, Some(fetch))?;
1143                }
1144
1145                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1146                // So we will append the alias to this subquery.
1147                // Example:
1148                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1149                if let Some(alias) = alias {
1150                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1151                        builder = builder.alias(alias)?;
1152                    }
1153                }
1154
1155                Ok(Some(builder.build()?))
1156            }
1157            LogicalPlan::SubqueryAlias(subquery_alias) => {
1158                let ret = Self::unparse_table_scan_pushdown(
1159                    &subquery_alias.input,
1160                    Some(subquery_alias.alias.clone()),
1161                    already_projected,
1162                )?;
1163                if let Some(alias) = alias {
1164                    if let Some(plan) = ret {
1165                        let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1166                        return Ok(Some(plan));
1167                    }
1168                }
1169                Ok(ret)
1170            }
1171            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1172            // The inner table scan could be a scan with pushdown operations.
1173            LogicalPlan::Projection(projection) => {
1174                if let Some(plan) = Self::unparse_table_scan_pushdown(
1175                    &projection.input,
1176                    alias.clone(),
1177                    already_projected,
1178                )? {
1179                    let exprs = if alias.is_some() {
1180                        let mut alias_rewriter =
1181                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1182                                table_schema: plan.schema().as_arrow(),
1183                                alias_name: alias_name.clone(),
1184                            });
1185                        projection
1186                            .expr
1187                            .iter()
1188                            .cloned()
1189                            .map(|expr| {
1190                                if let Some(ref mut rewriter) = alias_rewriter {
1191                                    expr.rewrite(rewriter).data()
1192                                } else {
1193                                    Ok(expr)
1194                                }
1195                            })
1196                            .collect::<Result<Vec<_>>>()?
1197                    } else {
1198                        projection.expr.clone()
1199                    };
1200                    Ok(Some(
1201                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1202                    ))
1203                } else {
1204                    Ok(None)
1205                }
1206            }
1207            _ => Ok(None),
1208        }
1209    }
1210
1211    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1212        match expr {
1213            Expr::Alias(Alias { expr, name, .. }) => {
1214                let inner = self.expr_to_sql(expr)?;
1215
1216                // Determine the alias name to use
1217                let col_name = if let Some(rewritten_name) =
1218                    self.dialect.col_alias_overrides(name)?
1219                {
1220                    rewritten_name.to_string()
1221                } else {
1222                    name.to_string()
1223                };
1224
1225                Ok(ast::SelectItem::ExprWithAlias {
1226                    expr: inner,
1227                    alias: self.new_ident_quoted_if_needs(col_name),
1228                })
1229            }
1230            _ => {
1231                let inner = self.expr_to_sql(expr)?;
1232
1233                Ok(ast::SelectItem::UnnamedExpr(inner))
1234            }
1235        }
1236    }
1237
1238    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1239        Ok(OrderByKind::Expressions(
1240            sort_exprs
1241                .iter()
1242                .map(|sort_expr| self.sort_to_sql(sort_expr))
1243                .collect::<Result<Vec<_>>>()?,
1244        ))
1245    }
1246
1247    fn join_operator_to_sql(
1248        &self,
1249        join_type: JoinType,
1250        constraint: ast::JoinConstraint,
1251    ) -> Result<ast::JoinOperator> {
1252        Ok(match join_type {
1253            JoinType::Inner => match &constraint {
1254                ast::JoinConstraint::On(_)
1255                | ast::JoinConstraint::Using(_)
1256                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1257                ast::JoinConstraint::None => {
1258                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1259                    // return a CROSS JOIN instead
1260                    ast::JoinOperator::CrossJoin
1261                }
1262            },
1263            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1264            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1265            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1266            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1267            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1268            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1269            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1270            JoinType::LeftMark | JoinType::RightMark => {
1271                unimplemented!("Unparsing of Mark join type")
1272            }
1273        })
1274    }
1275
1276    /// Convert the components of a USING clause to the USING AST. Returns
1277    /// 'None' if the conditions are not compatible with a USING expression,
1278    /// e.g. non-column expressions or non-matching names.
1279    fn join_using_to_sql(
1280        &self,
1281        join_conditions: &[(Expr, Expr)],
1282    ) -> Option<ast::JoinConstraint> {
1283        let mut object_names = Vec::with_capacity(join_conditions.len());
1284        for (left, right) in join_conditions {
1285            match (left, right) {
1286                (
1287                    Expr::Column(Column {
1288                        relation: _,
1289                        name: left_name,
1290                        spans: _,
1291                    }),
1292                    Expr::Column(Column {
1293                        relation: _,
1294                        name: right_name,
1295                        spans: _,
1296                    }),
1297                ) if left_name == right_name => {
1298                    // For example, if the join condition `t1.id = t2.id`
1299                    // this is represented as two columns like `[t1.id, t2.id]`
1300                    // This code forms `id` (without relation name)
1301                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1302                    object_names.push(ast::ObjectName::from(vec![ident]));
1303                }
1304                // USING is only valid with matching column names; arbitrary expressions
1305                // are not allowed
1306                _ => return None,
1307            }
1308        }
1309        Some(ast::JoinConstraint::Using(object_names))
1310    }
1311
1312    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1313    fn join_constraint_to_sql(
1314        &self,
1315        constraint: JoinConstraint,
1316        conditions: &[(Expr, Expr)],
1317        filter: Option<&Expr>,
1318    ) -> Result<ast::JoinConstraint> {
1319        match (constraint, conditions, filter) {
1320            // No constraints
1321            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1322                Ok(ast::JoinConstraint::None)
1323            }
1324
1325            (JoinConstraint::Using, conditions, None) => {
1326                match self.join_using_to_sql(conditions) {
1327                    Some(using) => Ok(using),
1328                    // As above, this should not be reachable from parsed SQL,
1329                    // but a user could create this; we "downgrade" to ON.
1330                    None => self.join_conditions_to_sql_on(conditions, None),
1331                }
1332            }
1333
1334            // Two cases here:
1335            // 1. Straightforward ON case, with possible equi-join conditions
1336            //    and additional filters
1337            // 2. USING with additional filters; we "downgrade" to ON, because
1338            //    you can't use USING with arbitrary filters. (This should not
1339            //    be accessible from parsed SQL, but may have been a
1340            //    custom-built JOIN by a user.)
1341            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1342                self.join_conditions_to_sql_on(conditions, filter)
1343            }
1344        }
1345    }
1346
1347    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1348    // AST node, with the equi-join conditions and the filter merged into a
1349    // single conditional expression
1350    fn join_conditions_to_sql_on(
1351        &self,
1352        join_conditions: &[(Expr, Expr)],
1353        filter: Option<&Expr>,
1354    ) -> Result<ast::JoinConstraint> {
1355        let mut condition = None;
1356        // AND the join conditions together to create the overall condition
1357        for (left, right) in join_conditions {
1358            // Parse left and right
1359            let l = self.expr_to_sql(left)?;
1360            let r = self.expr_to_sql(right)?;
1361            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1362            condition = match condition {
1363                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1364                None => Some(e),
1365            };
1366        }
1367
1368        // Then AND the non-equijoin filter condition as well
1369        condition = match (condition, filter) {
1370            (Some(expr), Some(filter)) => {
1371                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1372            }
1373            (Some(expr), None) => Some(expr),
1374            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1375            (None, None) => None,
1376        };
1377
1378        let constraint = match condition {
1379            Some(filter) => ast::JoinConstraint::On(filter),
1380            None => ast::JoinConstraint::None,
1381        };
1382
1383        Ok(constraint)
1384    }
1385
1386    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1387        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1388    }
1389
1390    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1391        let columns = columns
1392            .into_iter()
1393            .map(|ident| TableAliasColumnDef {
1394                name: ident,
1395                data_type: None,
1396            })
1397            .collect();
1398        ast::TableAlias {
1399            name: self.new_ident_quoted_if_needs(alias),
1400            columns,
1401        }
1402    }
1403
1404    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1405        not_impl_err!("Unsupported plan: {plan:?}")
1406    }
1407}
1408
1409impl From<BuilderError> for DataFusionError {
1410    fn from(e: BuilderError) -> Self {
1411        DataFusionError::External(Box::new(e))
1412    }
1413}
1414
1415/// The type of the input to the UNNEST table factor.
1416#[derive(Debug)]
1417enum UnnestInputType {
1418    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1419    OuterReference,
1420    /// The input is a scalar value. It will be presented like a scalar array or struct.
1421    Scalar,
1422}