datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    ast::{
20        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22    },
23    rewrite::{
24        inject_column_aliases_into_subquery, normalize_union_schema,
25        rewrite_plan_for_sort_on_non_projected_fields,
26        subquery_alias_inner_query_and_columns, TableAliasRewriter,
27    },
28    utils::{
29        find_agg_node_within_select, find_unnest_node_within_select,
30        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32    },
33    Unparser,
34};
35use crate::unparser::extension_unparser::{
36    UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    internal_err, not_impl_err,
43    tree_node::{TransformedResult, TreeNode},
44    Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// // convert to AST
85/// let sql = plan_to_sql(&plan).unwrap();
86/// // use the Display impl to convert to SQL text
87/// assert_eq!(
88///     sql.to_string(),
89///     "SELECT \"table\".id, \"table\".\"value\" FROM \"table\""
90/// )
91/// ```
92///
93/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
94/// [`expr_to_sql`]: crate::unparser::expr_to_sql
95pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96    let unparser = Unparser::default();
97    unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102        let mut plan = normalize_union_schema(plan)?;
103        if !self.dialect.supports_qualify() {
104            plan = rewrite_qualify(plan)?;
105        }
106
107        match plan {
108            LogicalPlan::Projection(_)
109            | LogicalPlan::Filter(_)
110            | LogicalPlan::Window(_)
111            | LogicalPlan::Aggregate(_)
112            | LogicalPlan::Sort(_)
113            | LogicalPlan::Join(_)
114            | LogicalPlan::Repartition(_)
115            | LogicalPlan::Union(_)
116            | LogicalPlan::TableScan(_)
117            | LogicalPlan::EmptyRelation(_)
118            | LogicalPlan::Subquery(_)
119            | LogicalPlan::SubqueryAlias(_)
120            | LogicalPlan::Limit(_)
121            | LogicalPlan::Statement(_)
122            | LogicalPlan::Values(_)
123            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125            LogicalPlan::Extension(extension) => {
126                self.extension_to_statement(extension.node.as_ref())
127            }
128            LogicalPlan::Explain(_)
129            | LogicalPlan::Analyze(_)
130            | LogicalPlan::Ddl(_)
131            | LogicalPlan::Copy(_)
132            | LogicalPlan::DescribeTable(_)
133            | LogicalPlan::RecursiveQuery(_)
134            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135        }
136    }
137
138    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
139    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
140    /// the first unparsing result will be returned.
141    fn extension_to_statement(
142        &self,
143        node: &dyn UserDefinedLogicalNode,
144    ) -> Result<ast::Statement> {
145        let mut statement = None;
146        for unparser in &self.extension_unparsers {
147            match unparser.unparse_to_statement(node, self)? {
148                UnparseToStatementResult::Modified(stmt) => {
149                    statement = Some(stmt);
150                    break;
151                }
152                UnparseToStatementResult::Unmodified => {}
153            }
154        }
155        if let Some(statement) = statement {
156            Ok(statement)
157        } else {
158            not_impl_err!("Unsupported extension node: {node:?}")
159        }
160    }
161
162    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
163    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
164    /// the first unparser supporting the node will be used.
165    fn extension_to_sql(
166        &self,
167        node: &dyn UserDefinedLogicalNode,
168        query: &mut Option<&mut QueryBuilder>,
169        select: &mut Option<&mut SelectBuilder>,
170        relation: &mut Option<&mut RelationBuilder>,
171    ) -> Result<()> {
172        for unparser in &self.extension_unparsers {
173            match unparser.unparse(node, self, query, select, relation)? {
174                UnparseWithinStatementResult::Modified => return Ok(()),
175                UnparseWithinStatementResult::Unmodified => {}
176            }
177        }
178        not_impl_err!("Unsupported extension node: {node:?}")
179    }
180
181    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182        let mut query_builder = Some(QueryBuilder::default());
183
184        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186        let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188        Ok(ast::Statement::Query(Box::new(query)))
189    }
190
191    fn select_to_sql_expr(
192        &self,
193        plan: &LogicalPlan,
194        query: &mut Option<QueryBuilder>,
195    ) -> Result<SetExpr> {
196        let mut select_builder = SelectBuilder::default();
197        select_builder.push_from(TableWithJoinsBuilder::default());
198        let mut relation_builder = RelationBuilder::default();
199        self.select_to_sql_recursively(
200            plan,
201            query,
202            &mut select_builder,
203            &mut relation_builder,
204        )?;
205
206        // If we were able to construct a full body (i.e. UNION ALL), return it
207        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208            return Ok(*body);
209        }
210
211        // If no projection is set, add a wildcard projection to the select
212        // which will be translated to `SELECT *` in the SQL statement
213        if !select_builder.already_projected() {
214            select_builder.projection(vec![ast::SelectItem::Wildcard(
215                ast::WildcardAdditionalOptions::default(),
216            )]);
217        }
218
219        let mut twj = select_builder.pop_from().unwrap();
220        twj.relation(relation_builder);
221        select_builder.push_from(twj);
222
223        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224    }
225
226    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
227    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
228    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
229    fn reconstruct_select_statement(
230        &self,
231        plan: &LogicalPlan,
232        p: &Projection,
233        select: &mut SelectBuilder,
234    ) -> Result<()> {
235        let mut exprs = p.expr.clone();
236
237        // If an Unnest node is found within the select, find and unproject the unnest column
238        if let Some(unnest) = find_unnest_node_within_select(plan) {
239            exprs = exprs
240                .into_iter()
241                .map(|e| unproject_unnest_expr(e, unnest))
242                .collect::<Result<Vec<_>>>()?;
243        };
244
245        match (
246            find_agg_node_within_select(plan, true),
247            find_window_nodes_within_select(plan, None, true),
248        ) {
249            (Some(agg), window) => {
250                let window_option = window.as_deref();
251                let items = exprs
252                    .into_iter()
253                    .map(|proj_expr| {
254                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255                        self.select_item_to_sql(&unproj)
256                    })
257                    .collect::<Result<Vec<_>>>()?;
258
259                select.projection(items);
260                select.group_by(ast::GroupByExpr::Expressions(
261                    agg.group_expr
262                        .iter()
263                        .map(|expr| self.expr_to_sql(expr))
264                        .collect::<Result<Vec<_>>>()?,
265                    vec![],
266                ));
267            }
268            (None, Some(window)) => {
269                let items = exprs
270                    .into_iter()
271                    .map(|proj_expr| {
272                        let unproj = unproject_window_exprs(proj_expr, &window)?;
273                        self.select_item_to_sql(&unproj)
274                    })
275                    .collect::<Result<Vec<_>>>()?;
276
277                select.projection(items);
278            }
279            _ => {
280                let items = exprs
281                    .iter()
282                    .map(|e| self.select_item_to_sql(e))
283                    .collect::<Result<Vec<_>>>()?;
284                select.projection(items);
285            }
286        }
287        Ok(())
288    }
289
290    fn derive(
291        &self,
292        plan: &LogicalPlan,
293        relation: &mut RelationBuilder,
294        alias: Option<ast::TableAlias>,
295        lateral: bool,
296    ) -> Result<()> {
297        let mut derived_builder = DerivedRelationBuilder::default();
298        derived_builder.lateral(lateral).alias(alias).subquery({
299            let inner_statement = self.plan_to_sql(plan)?;
300            if let ast::Statement::Query(inner_query) = inner_statement {
301                inner_query
302            } else {
303                return internal_err!(
304                    "Subquery must be a Query, but found {inner_statement:?}"
305                );
306            }
307        });
308        relation.derived(derived_builder);
309
310        Ok(())
311    }
312
313    fn derive_with_dialect_alias(
314        &self,
315        alias: &str,
316        plan: &LogicalPlan,
317        relation: &mut RelationBuilder,
318        lateral: bool,
319        columns: Vec<Ident>,
320    ) -> Result<()> {
321        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322            self.derive(
323                plan,
324                relation,
325                Some(self.new_table_alias(alias.to_string(), columns)),
326                lateral,
327            )
328        } else {
329            self.derive(plan, relation, None, lateral)
330        }
331    }
332
333    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334    fn select_to_sql_recursively(
335        &self,
336        plan: &LogicalPlan,
337        query: &mut Option<QueryBuilder>,
338        select: &mut SelectBuilder,
339        relation: &mut RelationBuilder,
340    ) -> Result<()> {
341        match plan {
342            LogicalPlan::TableScan(scan) => {
343                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
344                    plan,
345                    None,
346                    select.already_projected(),
347                )? {
348                    return self.select_to_sql_recursively(
349                        &unparsed_table_scan,
350                        query,
351                        select,
352                        relation,
353                    );
354                }
355                let mut builder = TableRelationBuilder::default();
356                let mut table_parts = vec![];
357                if let Some(catalog_name) = scan.table_name.catalog() {
358                    table_parts
359                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360                }
361                if let Some(schema_name) = scan.table_name.schema() {
362                    table_parts
363                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364                }
365                table_parts.push(
366                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367                );
368                builder.name(ast::ObjectName::from(table_parts));
369                relation.table(builder);
370
371                Ok(())
372            }
373            LogicalPlan::Projection(p) => {
374                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375                    return self
376                        .select_to_sql_recursively(&new_plan, query, select, relation);
377                }
378
379                // Projection can be top-level plan for unnest relation
380                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
381                // only one expression, which is the placeholder column generated by the rewriter.
382                let unnest_input_type = if p.expr.len() == 1 {
383                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384                } else {
385                    None
386                };
387                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
388                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
389                        if let Some(unnest_relation) =
390                            self.try_unnest_to_table_factor_sql(unnest)?
391                        {
392                            relation.unnest(unnest_relation);
393                            return self.select_to_sql_recursively(
394                                p.input.as_ref(),
395                                query,
396                                select,
397                                relation,
398                            );
399                        }
400                    }
401                }
402
403                // If it's a unnest projection, we should provide the table column alias
404                // to provide a column name for the unnest relation.
405                let columns = if unnest_input_type.is_some() {
406                    p.expr
407                        .iter()
408                        .map(|e| {
409                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
410                        })
411                        .collect()
412                } else {
413                    vec![]
414                };
415                // Projection can be top-level plan for derived table
416                if select.already_projected() {
417                    return self.derive_with_dialect_alias(
418                        "derived_projection",
419                        plan,
420                        relation,
421                        unnest_input_type
422                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
423                            .is_some(),
424                        columns,
425                    );
426                }
427                self.reconstruct_select_statement(plan, p, select)?;
428                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
429            }
430            LogicalPlan::Filter(filter) => {
431                if let Some(agg) =
432                    find_agg_node_within_select(plan, select.already_projected())
433                {
434                    let unprojected =
435                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
436                    let filter_expr = self.expr_to_sql(&unprojected)?;
437                    select.having(Some(filter_expr));
438                } else if let (Some(window), true) = (
439                    find_window_nodes_within_select(
440                        plan,
441                        None,
442                        select.already_projected(),
443                    ),
444                    self.dialect.supports_qualify(),
445                ) {
446                    let unprojected =
447                        unproject_window_exprs(filter.predicate.clone(), &window)?;
448                    let filter_expr = self.expr_to_sql(&unprojected)?;
449                    select.qualify(Some(filter_expr));
450                } else {
451                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
452                    select.selection(Some(filter_expr));
453                }
454
455                self.select_to_sql_recursively(
456                    filter.input.as_ref(),
457                    query,
458                    select,
459                    relation,
460                )
461            }
462            LogicalPlan::Limit(limit) => {
463                // Limit can be top-level plan for derived table
464                if select.already_projected() {
465                    return self.derive_with_dialect_alias(
466                        "derived_limit",
467                        plan,
468                        relation,
469                        false,
470                        vec![],
471                    );
472                }
473                if let Some(fetch) = &limit.fetch {
474                    let Some(query) = query.as_mut() else {
475                        return internal_err!(
476                            "Limit operator only valid in a statement context."
477                        );
478                    };
479                    query.limit(Some(self.expr_to_sql(fetch)?));
480                }
481
482                if let Some(skip) = &limit.skip {
483                    let Some(query) = query.as_mut() else {
484                        return internal_err!(
485                            "Offset operator only valid in a statement context."
486                        );
487                    };
488
489                    query.offset(Some(ast::Offset {
490                        rows: ast::OffsetRows::None,
491                        value: self.expr_to_sql(skip)?,
492                    }));
493                }
494
495                self.select_to_sql_recursively(
496                    limit.input.as_ref(),
497                    query,
498                    select,
499                    relation,
500                )
501            }
502            LogicalPlan::Sort(sort) => {
503                // Sort can be top-level plan for derived table
504                if select.already_projected() {
505                    return self.derive_with_dialect_alias(
506                        "derived_sort",
507                        plan,
508                        relation,
509                        false,
510                        vec![],
511                    );
512                }
513                let Some(query_ref) = query else {
514                    return internal_err!(
515                        "Sort operator only valid in a statement context."
516                    );
517                };
518
519                if let Some(fetch) = sort.fetch {
520                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
521                        fetch.to_string(),
522                        false,
523                    ))));
524                };
525
526                let agg = find_agg_node_within_select(plan, select.already_projected());
527                // unproject sort expressions
528                let sort_exprs: Vec<SortExpr> = sort
529                    .expr
530                    .iter()
531                    .map(|sort_expr| {
532                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
533                    })
534                    .collect::<Result<Vec<_>>>()?;
535
536                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
537
538                self.select_to_sql_recursively(
539                    sort.input.as_ref(),
540                    query,
541                    select,
542                    relation,
543                )
544            }
545            LogicalPlan::Aggregate(agg) => {
546                // Aggregation can be already handled in the projection case
547                if !select.already_projected() {
548                    // The query returns aggregate and group expressions. If that weren't the case,
549                    // the aggregate would have been placed inside a projection, making the check above^ false
550                    let exprs: Vec<_> = agg
551                        .aggr_expr
552                        .iter()
553                        .chain(agg.group_expr.iter())
554                        .map(|expr| self.select_item_to_sql(expr))
555                        .collect::<Result<Vec<_>>>()?;
556                    select.projection(exprs);
557
558                    select.group_by(ast::GroupByExpr::Expressions(
559                        agg.group_expr
560                            .iter()
561                            .map(|expr| self.expr_to_sql(expr))
562                            .collect::<Result<Vec<_>>>()?,
563                        vec![],
564                    ));
565                }
566
567                self.select_to_sql_recursively(
568                    agg.input.as_ref(),
569                    query,
570                    select,
571                    relation,
572                )
573            }
574            LogicalPlan::Distinct(distinct) => {
575                // Distinct can be top-level plan for derived table
576                if select.already_projected() {
577                    return self.derive_with_dialect_alias(
578                        "derived_distinct",
579                        plan,
580                        relation,
581                        false,
582                        vec![],
583                    );
584                }
585
586                // If this distinct is the parent of a Union and we're in a query context,
587                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
588                if let Distinct::All(input) = distinct {
589                    if matches!(input.as_ref(), LogicalPlan::Union(_)) {
590                        if let Some(query_mut) = query.as_mut() {
591                            query_mut.distinct_union();
592                            return self.select_to_sql_recursively(
593                                input.as_ref(),
594                                query,
595                                select,
596                                relation,
597                            );
598                        }
599                    }
600                }
601
602                let (select_distinct, input) = match distinct {
603                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
604                    Distinct::On(on) => {
605                        let exprs = on
606                            .on_expr
607                            .iter()
608                            .map(|e| self.expr_to_sql(e))
609                            .collect::<Result<Vec<_>>>()?;
610                        let items = on
611                            .select_expr
612                            .iter()
613                            .map(|e| self.select_item_to_sql(e))
614                            .collect::<Result<Vec<_>>>()?;
615                        if let Some(sort_expr) = &on.sort_expr {
616                            if let Some(query_ref) = query {
617                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
618                            } else {
619                                return internal_err!(
620                                    "Sort operator only valid in a statement context."
621                                );
622                            }
623                        }
624                        select.projection(items);
625                        (ast::Distinct::On(exprs), on.input.as_ref())
626                    }
627                };
628                select.distinct(Some(select_distinct));
629                self.select_to_sql_recursively(input, query, select, relation)
630            }
631            LogicalPlan::Join(join) => {
632                let mut table_scan_filters = vec![];
633                let (left_plan, right_plan) = match join.join_type {
634                    JoinType::RightSemi | JoinType::RightAnti => {
635                        (&join.right, &join.left)
636                    }
637                    _ => (&join.left, &join.right),
638                };
639                // If there's an outer projection plan, it will already set up the projection.
640                // In that case, we don't need to worry about setting up the projection here.
641                // The outer projection plan will handle projecting the correct columns.
642                let already_projected = select.already_projected();
643
644                let left_plan =
645                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
646                        Some((plan, filters)) => {
647                            table_scan_filters.extend(filters);
648                            Arc::new(plan)
649                        }
650                        None => Arc::clone(left_plan),
651                    };
652
653                self.select_to_sql_recursively(
654                    left_plan.as_ref(),
655                    query,
656                    select,
657                    relation,
658                )?;
659
660                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
661                {
662                    Some(select.pop_projections())
663                } else {
664                    None
665                };
666
667                let right_plan =
668                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
669                        Some((plan, filters)) => {
670                            table_scan_filters.extend(filters);
671                            Arc::new(plan)
672                        }
673                        None => Arc::clone(right_plan),
674                    };
675
676                let mut right_relation = RelationBuilder::default();
677
678                self.select_to_sql_recursively(
679                    right_plan.as_ref(),
680                    query,
681                    select,
682                    &mut right_relation,
683                )?;
684
685                let join_filters = if table_scan_filters.is_empty() {
686                    join.filter.clone()
687                } else {
688                    // Combine `table_scan_filters` into a single filter using `AND`
689                    let Some(combined_filters) =
690                        table_scan_filters.into_iter().reduce(|acc, filter| {
691                            Expr::BinaryExpr(BinaryExpr {
692                                left: Box::new(acc),
693                                op: Operator::And,
694                                right: Box::new(filter),
695                            })
696                        })
697                    else {
698                        return internal_err!("Failed to combine TableScan filters");
699                    };
700
701                    // Combine `join.filter` with `combined_filters` using `AND`
702                    match &join.filter {
703                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
704                            left: Box::new(filter.clone()),
705                            op: Operator::And,
706                            right: Box::new(combined_filters),
707                        })),
708                        None => Some(combined_filters),
709                    }
710                };
711
712                let join_constraint = self.join_constraint_to_sql(
713                    join.join_constraint,
714                    &join.on,
715                    join_filters.as_ref(),
716                )?;
717
718                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
719                {
720                    Some(select.pop_projections())
721                } else {
722                    None
723                };
724
725                match join.join_type {
726                    JoinType::LeftSemi
727                    | JoinType::LeftAnti
728                    | JoinType::LeftMark
729                    | JoinType::RightSemi
730                    | JoinType::RightAnti
731                    | JoinType::RightMark => {
732                        let mut query_builder = QueryBuilder::default();
733                        let mut from = TableWithJoinsBuilder::default();
734                        let mut exists_select: SelectBuilder = SelectBuilder::default();
735                        from.relation(right_relation);
736                        exists_select.push_from(from);
737                        if let Some(filter) = &join.filter {
738                            exists_select.selection(Some(self.expr_to_sql(filter)?));
739                        }
740                        for (left, right) in &join.on {
741                            exists_select.selection(Some(
742                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
743                            ));
744                        }
745                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
746                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
747                        )]);
748                        query_builder.body(Box::new(SetExpr::Select(Box::new(
749                            exists_select.build()?,
750                        ))));
751
752                        let negated = match join.join_type {
753                            JoinType::LeftSemi
754                            | JoinType::RightSemi
755                            | JoinType::LeftMark
756                            | JoinType::RightMark => false,
757                            JoinType::LeftAnti | JoinType::RightAnti => true,
758                            _ => unreachable!(),
759                        };
760                        let exists_expr = ast::Expr::Exists {
761                            subquery: Box::new(query_builder.build()?),
762                            negated,
763                        };
764
765                        match join.join_type {
766                            JoinType::LeftMark | JoinType::RightMark => {
767                                let source_schema =
768                                    if join.join_type == JoinType::LeftMark {
769                                        right_plan.schema()
770                                    } else {
771                                        left_plan.schema()
772                                    };
773                                let (table_ref, _) = source_schema.qualified_field(0);
774                                let column = self.col_to_sql(&Column::new(
775                                    table_ref.cloned(),
776                                    "mark",
777                                ))?;
778                                select.replace_mark(&column, &exists_expr);
779                            }
780                            _ => {
781                                select.selection(Some(exists_expr));
782                            }
783                        }
784                        if let Some(projection) = left_projection {
785                            select.projection(projection);
786                        }
787                    }
788                    JoinType::Inner
789                    | JoinType::Left
790                    | JoinType::Right
791                    | JoinType::Full => {
792                        let Ok(Some(relation)) = right_relation.build() else {
793                            return internal_err!("Failed to build right relation");
794                        };
795                        let ast_join = ast::Join {
796                            relation,
797                            global: false,
798                            join_operator: self
799                                .join_operator_to_sql(join.join_type, join_constraint)?,
800                        };
801                        let mut from = select.pop_from().unwrap();
802                        from.push_join(ast_join);
803                        select.push_from(from);
804                        if !already_projected {
805                            let Some(left_projection) = left_projection else {
806                                return internal_err!("Left projection is missing");
807                            };
808
809                            let Some(right_projection) = right_projection else {
810                                return internal_err!("Right projection is missing");
811                            };
812
813                            let projection = left_projection
814                                .into_iter()
815                                .chain(right_projection)
816                                .collect();
817                            select.projection(projection);
818                        }
819                    }
820                };
821
822                Ok(())
823            }
824            LogicalPlan::SubqueryAlias(plan_alias) => {
825                let (plan, mut columns) =
826                    subquery_alias_inner_query_and_columns(plan_alias);
827                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
828                    plan,
829                    Some(plan_alias.alias.clone()),
830                    select.already_projected(),
831                )?;
832                // if the child plan is a TableScan with pushdown operations, we don't need to
833                // create an additional subquery for it
834                if !select.already_projected() && unparsed_table_scan.is_none() {
835                    select.projection(vec![ast::SelectItem::Wildcard(
836                        ast::WildcardAdditionalOptions::default(),
837                    )]);
838                }
839                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
840                if !columns.is_empty()
841                    && !self.dialect.supports_column_alias_in_table_alias()
842                {
843                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
844                    let rewritten_plan =
845                        match inject_column_aliases_into_subquery(plan, columns) {
846                            Ok(p) => p,
847                            Err(e) => {
848                                return internal_err!(
849                                    "Failed to transform SubqueryAlias plan: {e}"
850                                )
851                            }
852                        };
853
854                    columns = vec![];
855
856                    self.select_to_sql_recursively(
857                        &rewritten_plan,
858                        query,
859                        select,
860                        relation,
861                    )?;
862                } else {
863                    self.select_to_sql_recursively(&plan, query, select, relation)?;
864                }
865
866                relation.alias(Some(
867                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
868                ));
869
870                Ok(())
871            }
872            LogicalPlan::Union(union) => {
873                // Covers cases where the UNION is a subquery and the projection is at the top level
874                if select.already_projected() {
875                    return self.derive_with_dialect_alias(
876                        "derived_union",
877                        plan,
878                        relation,
879                        false,
880                        vec![],
881                    );
882                }
883
884                let input_exprs: Vec<SetExpr> = union
885                    .inputs
886                    .iter()
887                    .map(|input| self.select_to_sql_expr(input, query))
888                    .collect::<Result<Vec<_>>>()?;
889
890                if input_exprs.len() < 2 {
891                    return internal_err!("UNION operator requires at least 2 inputs");
892                }
893
894                let set_quantifier =
895                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
896                        // Setting the SetQuantifier to None will unparse as a `UNION`
897                        // rather than a `UNION ALL`.
898                        ast::SetQuantifier::None
899                    } else {
900                        ast::SetQuantifier::All
901                    };
902
903                // Build the union expression tree bottom-up by reversing the order
904                // note that we are also swapping left and right inputs because of the rev
905                let union_expr = input_exprs
906                    .into_iter()
907                    .rev()
908                    .reduce(|a, b| SetExpr::SetOperation {
909                        op: ast::SetOperator::Union,
910                        set_quantifier,
911                        left: Box::new(b),
912                        right: Box::new(a),
913                    })
914                    .unwrap();
915
916                let Some(query) = query.as_mut() else {
917                    return internal_err!(
918                        "UNION ALL operator only valid in a statement context"
919                    );
920                };
921                query.body(Box::new(union_expr));
922
923                Ok(())
924            }
925            LogicalPlan::Window(window) => {
926                // Window nodes are handled simultaneously with Projection nodes
927                self.select_to_sql_recursively(
928                    window.input.as_ref(),
929                    query,
930                    select,
931                    relation,
932                )
933            }
934            LogicalPlan::EmptyRelation(_) => {
935                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
936                // a TableRelationBuilder will be created for the UNNEST node first.
937                if !relation.has_relation() {
938                    relation.empty();
939                }
940                Ok(())
941            }
942            LogicalPlan::Extension(extension) => {
943                if let Some(query) = query.as_mut() {
944                    self.extension_to_sql(
945                        extension.node.as_ref(),
946                        &mut Some(query),
947                        &mut Some(select),
948                        &mut Some(relation),
949                    )
950                } else {
951                    self.extension_to_sql(
952                        extension.node.as_ref(),
953                        &mut None,
954                        &mut Some(select),
955                        &mut Some(relation),
956                    )
957                }
958            }
959            LogicalPlan::Unnest(unnest) => {
960                if !unnest.struct_type_columns.is_empty() {
961                    return internal_err!(
962                        "Struct type columns are not currently supported in UNNEST: {:?}",
963                        unnest.struct_type_columns
964                    );
965                }
966
967                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
968                // Otherwise, there will be a duplicate SELECT clause.
969                // | Projection: table.col1, UNNEST(table.col2)
970                // |   Unnest: UNNEST(table.col2)
971                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
972                // |       Filter: table.col3 = Int64(3)
973                // |         TableScan: table projection=None
974                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
975                    // continue with projection input
976                    self.select_to_sql_recursively(&p.input, query, select, relation)
977                } else {
978                    internal_err!("Unnest input is not a Projection: {unnest:?}")
979                }
980            }
981            LogicalPlan::Subquery(subquery)
982                if find_unnest_node_until_relation(subquery.subquery.as_ref())
983                    .is_some() =>
984            {
985                if self.dialect.unnest_as_table_factor() {
986                    self.select_to_sql_recursively(
987                        subquery.subquery.as_ref(),
988                        query,
989                        select,
990                        relation,
991                    )
992                } else {
993                    self.derive_with_dialect_alias(
994                        "derived_unnest",
995                        subquery.subquery.as_ref(),
996                        relation,
997                        true,
998                        vec![],
999                    )
1000                }
1001            }
1002            _ => {
1003                not_impl_err!("Unsupported operator: {plan:?}")
1004            }
1005        }
1006    }
1007
1008    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
1009    ///
1010    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
1011    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
1012    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
1013    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
1014    /// - If the column is not a placeholder column, return [None].
1015    ///
1016    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1017    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1018        if let Expr::Alias(Alias { expr, .. }) = expr {
1019            if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1020                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1021                    if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1022                        return Some(UnnestInputType::OuterReference);
1023                    }
1024                    return Some(UnnestInputType::Scalar);
1025                }
1026            }
1027        }
1028        None
1029    }
1030
1031    fn try_unnest_to_table_factor_sql(
1032        &self,
1033        unnest: &Unnest,
1034    ) -> Result<Option<UnnestRelationBuilder>> {
1035        let mut unnest_relation = UnnestRelationBuilder::default();
1036        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1037            return Ok(None);
1038        };
1039
1040        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1041            // It may be possible that UNNEST is used as a source for the query.
1042            // However, at this point, we don't yet know if it is just a single expression
1043            // from another source or if it's from UNNEST.
1044            //
1045            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1046            // which is normally safe to unnest as a table factor.
1047            // However, in the future, more comprehensive checks can be added here.
1048            return Ok(None);
1049        };
1050
1051        let exprs = projection
1052            .expr
1053            .iter()
1054            .map(|e| self.expr_to_sql(e))
1055            .collect::<Result<Vec<_>>>()?;
1056        unnest_relation.array_exprs(exprs);
1057
1058        Ok(Some(unnest_relation))
1059    }
1060
1061    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1062        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1063    }
1064
1065    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1066    /// If the table scan is without any pushdown operations, return None.
1067    fn unparse_table_scan_pushdown(
1068        plan: &LogicalPlan,
1069        alias: Option<TableReference>,
1070        already_projected: bool,
1071    ) -> Result<Option<LogicalPlan>> {
1072        match plan {
1073            LogicalPlan::TableScan(table_scan) => {
1074                if !Self::is_scan_with_pushdown(table_scan) {
1075                    return Ok(None);
1076                }
1077                let table_schema = table_scan.source.schema();
1078                let mut filter_alias_rewriter =
1079                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1080                        table_schema: &table_schema,
1081                        alias_name: alias_name.clone(),
1082                    });
1083
1084                let mut builder = LogicalPlanBuilder::scan(
1085                    table_scan.table_name.clone(),
1086                    Arc::clone(&table_scan.source),
1087                    None,
1088                )?;
1089                // We will rebase the column references to the new alias if it exists.
1090                // If the projection or filters are empty, we will append alias to the table scan.
1091                //
1092                // Example:
1093                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1094                if let Some(ref alias) = alias {
1095                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1096                        builder = builder.alias(alias.clone())?;
1097                    }
1098                }
1099
1100                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1101                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1102                // information included in the TableScan node.
1103                if !already_projected {
1104                    if let Some(project_vec) = &table_scan.projection {
1105                        if project_vec.is_empty() {
1106                            builder = builder.project(vec![Expr::Literal(
1107                                ScalarValue::Int64(Some(1)),
1108                                None,
1109                            )])?;
1110                        } else {
1111                            let project_columns = project_vec
1112                                .iter()
1113                                .cloned()
1114                                .map(|i| {
1115                                    let schema = table_scan.source.schema();
1116                                    let field = schema.field(i);
1117                                    if alias.is_some() {
1118                                        Column::new(alias.clone(), field.name().clone())
1119                                    } else {
1120                                        Column::new(
1121                                            Some(table_scan.table_name.clone()),
1122                                            field.name().clone(),
1123                                        )
1124                                    }
1125                                })
1126                                .collect::<Vec<_>>();
1127                            builder = builder.project(project_columns)?;
1128                        };
1129                    }
1130                }
1131
1132                let filter_expr: Result<Option<Expr>> = table_scan
1133                    .filters
1134                    .iter()
1135                    .cloned()
1136                    .map(|expr| {
1137                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1138                            expr.rewrite(rewriter).data()
1139                        } else {
1140                            Ok(expr)
1141                        }
1142                    })
1143                    .reduce(|acc, expr_result| {
1144                        acc.and_then(|acc_expr| {
1145                            expr_result.map(|expr| acc_expr.and(expr))
1146                        })
1147                    })
1148                    .transpose();
1149
1150                if let Some(filter) = filter_expr? {
1151                    builder = builder.filter(filter)?;
1152                }
1153
1154                if let Some(fetch) = table_scan.fetch {
1155                    builder = builder.limit(0, Some(fetch))?;
1156                }
1157
1158                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1159                // So we will append the alias to this subquery.
1160                // Example:
1161                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1162                if let Some(alias) = alias {
1163                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1164                        builder = builder.alias(alias)?;
1165                    }
1166                }
1167
1168                Ok(Some(builder.build()?))
1169            }
1170            LogicalPlan::SubqueryAlias(subquery_alias) => {
1171                let ret = Self::unparse_table_scan_pushdown(
1172                    &subquery_alias.input,
1173                    Some(subquery_alias.alias.clone()),
1174                    already_projected,
1175                )?;
1176                if let Some(alias) = alias {
1177                    if let Some(plan) = ret {
1178                        let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1179                        return Ok(Some(plan));
1180                    }
1181                }
1182                Ok(ret)
1183            }
1184            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1185            // The inner table scan could be a scan with pushdown operations.
1186            LogicalPlan::Projection(projection) => {
1187                if let Some(plan) = Self::unparse_table_scan_pushdown(
1188                    &projection.input,
1189                    alias.clone(),
1190                    already_projected,
1191                )? {
1192                    let exprs = if alias.is_some() {
1193                        let mut alias_rewriter =
1194                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1195                                table_schema: plan.schema().as_arrow(),
1196                                alias_name: alias_name.clone(),
1197                            });
1198                        projection
1199                            .expr
1200                            .iter()
1201                            .cloned()
1202                            .map(|expr| {
1203                                if let Some(ref mut rewriter) = alias_rewriter {
1204                                    expr.rewrite(rewriter).data()
1205                                } else {
1206                                    Ok(expr)
1207                                }
1208                            })
1209                            .collect::<Result<Vec<_>>>()?
1210                    } else {
1211                        projection.expr.clone()
1212                    };
1213                    Ok(Some(
1214                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1215                    ))
1216                } else {
1217                    Ok(None)
1218                }
1219            }
1220            _ => Ok(None),
1221        }
1222    }
1223
1224    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1225        match expr {
1226            Expr::Alias(Alias { expr, name, .. }) => {
1227                let inner = self.expr_to_sql(expr)?;
1228
1229                // Determine the alias name to use
1230                let col_name = if let Some(rewritten_name) =
1231                    self.dialect.col_alias_overrides(name)?
1232                {
1233                    rewritten_name.to_string()
1234                } else {
1235                    name.to_string()
1236                };
1237
1238                Ok(ast::SelectItem::ExprWithAlias {
1239                    expr: inner,
1240                    alias: self.new_ident_quoted_if_needs(col_name),
1241                })
1242            }
1243            _ => {
1244                let inner = self.expr_to_sql(expr)?;
1245
1246                Ok(ast::SelectItem::UnnamedExpr(inner))
1247            }
1248        }
1249    }
1250
1251    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1252        Ok(OrderByKind::Expressions(
1253            sort_exprs
1254                .iter()
1255                .map(|sort_expr| self.sort_to_sql(sort_expr))
1256                .collect::<Result<Vec<_>>>()?,
1257        ))
1258    }
1259
1260    fn join_operator_to_sql(
1261        &self,
1262        join_type: JoinType,
1263        constraint: ast::JoinConstraint,
1264    ) -> Result<ast::JoinOperator> {
1265        Ok(match join_type {
1266            JoinType::Inner => match &constraint {
1267                ast::JoinConstraint::On(_)
1268                | ast::JoinConstraint::Using(_)
1269                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1270                ast::JoinConstraint::None => {
1271                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1272                    // return a CROSS JOIN instead
1273                    ast::JoinOperator::CrossJoin(constraint)
1274                }
1275            },
1276            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1277            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1278            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1279            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1280            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1281            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1282            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1283            JoinType::LeftMark | JoinType::RightMark => {
1284                unimplemented!("Unparsing of Mark join type")
1285            }
1286        })
1287    }
1288
1289    /// Convert the components of a USING clause to the USING AST. Returns
1290    /// 'None' if the conditions are not compatible with a USING expression,
1291    /// e.g. non-column expressions or non-matching names.
1292    fn join_using_to_sql(
1293        &self,
1294        join_conditions: &[(Expr, Expr)],
1295    ) -> Option<ast::JoinConstraint> {
1296        let mut object_names = Vec::with_capacity(join_conditions.len());
1297        for (left, right) in join_conditions {
1298            match (left, right) {
1299                (
1300                    Expr::Column(Column {
1301                        relation: _,
1302                        name: left_name,
1303                        spans: _,
1304                    }),
1305                    Expr::Column(Column {
1306                        relation: _,
1307                        name: right_name,
1308                        spans: _,
1309                    }),
1310                ) if left_name == right_name => {
1311                    // For example, if the join condition `t1.id = t2.id`
1312                    // this is represented as two columns like `[t1.id, t2.id]`
1313                    // This code forms `id` (without relation name)
1314                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1315                    object_names.push(ast::ObjectName::from(vec![ident]));
1316                }
1317                // USING is only valid with matching column names; arbitrary expressions
1318                // are not allowed
1319                _ => return None,
1320            }
1321        }
1322        Some(ast::JoinConstraint::Using(object_names))
1323    }
1324
1325    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1326    fn join_constraint_to_sql(
1327        &self,
1328        constraint: JoinConstraint,
1329        conditions: &[(Expr, Expr)],
1330        filter: Option<&Expr>,
1331    ) -> Result<ast::JoinConstraint> {
1332        match (constraint, conditions, filter) {
1333            // No constraints
1334            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1335                Ok(ast::JoinConstraint::None)
1336            }
1337
1338            (JoinConstraint::Using, conditions, None) => {
1339                match self.join_using_to_sql(conditions) {
1340                    Some(using) => Ok(using),
1341                    // As above, this should not be reachable from parsed SQL,
1342                    // but a user could create this; we "downgrade" to ON.
1343                    None => self.join_conditions_to_sql_on(conditions, None),
1344                }
1345            }
1346
1347            // Two cases here:
1348            // 1. Straightforward ON case, with possible equi-join conditions
1349            //    and additional filters
1350            // 2. USING with additional filters; we "downgrade" to ON, because
1351            //    you can't use USING with arbitrary filters. (This should not
1352            //    be accessible from parsed SQL, but may have been a
1353            //    custom-built JOIN by a user.)
1354            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1355                self.join_conditions_to_sql_on(conditions, filter)
1356            }
1357        }
1358    }
1359
1360    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1361    // AST node, with the equi-join conditions and the filter merged into a
1362    // single conditional expression
1363    fn join_conditions_to_sql_on(
1364        &self,
1365        join_conditions: &[(Expr, Expr)],
1366        filter: Option<&Expr>,
1367    ) -> Result<ast::JoinConstraint> {
1368        let mut condition = None;
1369        // AND the join conditions together to create the overall condition
1370        for (left, right) in join_conditions {
1371            // Parse left and right
1372            let l = self.expr_to_sql(left)?;
1373            let r = self.expr_to_sql(right)?;
1374            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1375            condition = match condition {
1376                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1377                None => Some(e),
1378            };
1379        }
1380
1381        // Then AND the non-equijoin filter condition as well
1382        condition = match (condition, filter) {
1383            (Some(expr), Some(filter)) => {
1384                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1385            }
1386            (Some(expr), None) => Some(expr),
1387            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1388            (None, None) => None,
1389        };
1390
1391        let constraint = match condition {
1392            Some(filter) => ast::JoinConstraint::On(filter),
1393            None => ast::JoinConstraint::None,
1394        };
1395
1396        Ok(constraint)
1397    }
1398
1399    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1400        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1401    }
1402
1403    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1404        let columns = columns
1405            .into_iter()
1406            .map(|ident| TableAliasColumnDef {
1407                name: ident,
1408                data_type: None,
1409            })
1410            .collect();
1411        ast::TableAlias {
1412            name: self.new_ident_quoted_if_needs(alias),
1413            columns,
1414        }
1415    }
1416
1417    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1418        not_impl_err!("Unsupported plan: {plan:?}")
1419    }
1420}
1421
1422impl From<BuilderError> for DataFusionError {
1423    fn from(e: BuilderError) -> Self {
1424        DataFusionError::External(Box::new(e))
1425    }
1426}
1427
1428/// The type of the input to the UNNEST table factor.
1429#[derive(Debug)]
1430enum UnnestInputType {
1431    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1432    OuterReference,
1433    /// The input is a scalar value. It will be presented like a scalar array or struct.
1434    Scalar,
1435}