datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    Unparser,
20    ast::{
21        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23    },
24    rewrite::{
25        TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26        rewrite_plan_for_sort_on_non_projected_fields,
27        subquery_alias_inner_query_and_columns,
28    },
29    utils::{
30        find_agg_node_within_select, find_unnest_node_within_select,
31        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
33    },
34};
35use crate::unparser::extension_unparser::{
36    UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
43    internal_err, not_impl_err,
44    tree_node::{TransformedResult, TreeNode},
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode, expr::Alias,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// // convert to AST
85/// let sql = plan_to_sql(&plan).unwrap();
86/// // use the Display impl to convert to SQL text
87/// assert_eq!(
88///     sql.to_string(),
89///     "SELECT \"table\".id, \"table\".\"value\" FROM \"table\""
90/// )
91/// ```
92///
93/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
94/// [`expr_to_sql`]: crate::unparser::expr_to_sql
95pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96    let unparser = Unparser::default();
97    unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102        let mut plan = normalize_union_schema(plan)?;
103        if !self.dialect.supports_qualify() {
104            plan = rewrite_qualify(plan)?;
105        }
106
107        match plan {
108            LogicalPlan::Projection(_)
109            | LogicalPlan::Filter(_)
110            | LogicalPlan::Window(_)
111            | LogicalPlan::Aggregate(_)
112            | LogicalPlan::Sort(_)
113            | LogicalPlan::Join(_)
114            | LogicalPlan::Repartition(_)
115            | LogicalPlan::Union(_)
116            | LogicalPlan::TableScan(_)
117            | LogicalPlan::EmptyRelation(_)
118            | LogicalPlan::Subquery(_)
119            | LogicalPlan::SubqueryAlias(_)
120            | LogicalPlan::Limit(_)
121            | LogicalPlan::Statement(_)
122            | LogicalPlan::Values(_)
123            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125            LogicalPlan::Extension(extension) => {
126                self.extension_to_statement(extension.node.as_ref())
127            }
128            LogicalPlan::Explain(_)
129            | LogicalPlan::Analyze(_)
130            | LogicalPlan::Ddl(_)
131            | LogicalPlan::Copy(_)
132            | LogicalPlan::DescribeTable(_)
133            | LogicalPlan::RecursiveQuery(_)
134            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135        }
136    }
137
138    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
139    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
140    /// the first unparsing result will be returned.
141    fn extension_to_statement(
142        &self,
143        node: &dyn UserDefinedLogicalNode,
144    ) -> Result<ast::Statement> {
145        let mut statement = None;
146        for unparser in &self.extension_unparsers {
147            match unparser.unparse_to_statement(node, self)? {
148                UnparseToStatementResult::Modified(stmt) => {
149                    statement = Some(stmt);
150                    break;
151                }
152                UnparseToStatementResult::Unmodified => {}
153            }
154        }
155        if let Some(statement) = statement {
156            Ok(statement)
157        } else {
158            not_impl_err!("Unsupported extension node: {node:?}")
159        }
160    }
161
162    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
163    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
164    /// the first unparser supporting the node will be used.
165    fn extension_to_sql(
166        &self,
167        node: &dyn UserDefinedLogicalNode,
168        query: &mut Option<&mut QueryBuilder>,
169        select: &mut Option<&mut SelectBuilder>,
170        relation: &mut Option<&mut RelationBuilder>,
171    ) -> Result<()> {
172        for unparser in &self.extension_unparsers {
173            match unparser.unparse(node, self, query, select, relation)? {
174                UnparseWithinStatementResult::Modified => return Ok(()),
175                UnparseWithinStatementResult::Unmodified => {}
176            }
177        }
178        not_impl_err!("Unsupported extension node: {node:?}")
179    }
180
181    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182        let mut query_builder = Some(QueryBuilder::default());
183
184        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186        let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188        Ok(ast::Statement::Query(Box::new(query)))
189    }
190
191    fn select_to_sql_expr(
192        &self,
193        plan: &LogicalPlan,
194        query: &mut Option<QueryBuilder>,
195    ) -> Result<SetExpr> {
196        let mut select_builder = SelectBuilder::default();
197        select_builder.push_from(TableWithJoinsBuilder::default());
198        let mut relation_builder = RelationBuilder::default();
199        self.select_to_sql_recursively(
200            plan,
201            query,
202            &mut select_builder,
203            &mut relation_builder,
204        )?;
205
206        // If we were able to construct a full body (i.e. UNION ALL), return it
207        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208            return Ok(*body);
209        }
210
211        // If no projection is set, add a wildcard projection to the select
212        // which will be translated to `SELECT *` in the SQL statement
213        if !select_builder.already_projected() {
214            select_builder.projection(vec![ast::SelectItem::Wildcard(
215                ast::WildcardAdditionalOptions::default(),
216            )]);
217        }
218
219        let mut twj = select_builder.pop_from().unwrap();
220        twj.relation(relation_builder);
221        select_builder.push_from(twj);
222
223        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224    }
225
226    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
227    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
228    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
229    fn reconstruct_select_statement(
230        &self,
231        plan: &LogicalPlan,
232        p: &Projection,
233        select: &mut SelectBuilder,
234    ) -> Result<()> {
235        let mut exprs = p.expr.clone();
236
237        // If an Unnest node is found within the select, find and unproject the unnest column
238        if let Some(unnest) = find_unnest_node_within_select(plan) {
239            exprs = exprs
240                .into_iter()
241                .map(|e| unproject_unnest_expr(e, unnest))
242                .collect::<Result<Vec<_>>>()?;
243        };
244
245        match (
246            find_agg_node_within_select(plan, true),
247            find_window_nodes_within_select(plan, None, true),
248        ) {
249            (Some(agg), window) => {
250                let window_option = window.as_deref();
251                let items = exprs
252                    .into_iter()
253                    .map(|proj_expr| {
254                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255                        self.select_item_to_sql(&unproj)
256                    })
257                    .collect::<Result<Vec<_>>>()?;
258
259                select.projection(items);
260                select.group_by(ast::GroupByExpr::Expressions(
261                    agg.group_expr
262                        .iter()
263                        .map(|expr| self.expr_to_sql(expr))
264                        .collect::<Result<Vec<_>>>()?,
265                    vec![],
266                ));
267            }
268            (None, Some(window)) => {
269                let items = exprs
270                    .into_iter()
271                    .map(|proj_expr| {
272                        let unproj = unproject_window_exprs(proj_expr, &window)?;
273                        self.select_item_to_sql(&unproj)
274                    })
275                    .collect::<Result<Vec<_>>>()?;
276
277                select.projection(items);
278            }
279            _ => {
280                let items = exprs
281                    .iter()
282                    .map(|e| self.select_item_to_sql(e))
283                    .collect::<Result<Vec<_>>>()?;
284                select.projection(items);
285            }
286        }
287        Ok(())
288    }
289
290    fn derive(
291        &self,
292        plan: &LogicalPlan,
293        relation: &mut RelationBuilder,
294        alias: Option<ast::TableAlias>,
295        lateral: bool,
296    ) -> Result<()> {
297        let mut derived_builder = DerivedRelationBuilder::default();
298        derived_builder.lateral(lateral).alias(alias).subquery({
299            let inner_statement = self.plan_to_sql(plan)?;
300            if let ast::Statement::Query(inner_query) = inner_statement {
301                inner_query
302            } else {
303                return internal_err!(
304                    "Subquery must be a Query, but found {inner_statement:?}"
305                );
306            }
307        });
308        relation.derived(derived_builder);
309
310        Ok(())
311    }
312
313    fn derive_with_dialect_alias(
314        &self,
315        alias: &str,
316        plan: &LogicalPlan,
317        relation: &mut RelationBuilder,
318        lateral: bool,
319        columns: Vec<Ident>,
320    ) -> Result<()> {
321        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322            self.derive(
323                plan,
324                relation,
325                Some(self.new_table_alias(alias.to_string(), columns)),
326                lateral,
327            )
328        } else {
329            self.derive(plan, relation, None, lateral)
330        }
331    }
332
333    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334    fn select_to_sql_recursively(
335        &self,
336        plan: &LogicalPlan,
337        query: &mut Option<QueryBuilder>,
338        select: &mut SelectBuilder,
339        relation: &mut RelationBuilder,
340    ) -> Result<()> {
341        match plan {
342            LogicalPlan::TableScan(scan) => {
343                if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
344                    plan,
345                    None,
346                    select.already_projected(),
347                )? {
348                    return self.select_to_sql_recursively(
349                        &unparsed_table_scan,
350                        query,
351                        select,
352                        relation,
353                    );
354                }
355                let mut builder = TableRelationBuilder::default();
356                let mut table_parts = vec![];
357                if let Some(catalog_name) = scan.table_name.catalog() {
358                    table_parts
359                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360                }
361                if let Some(schema_name) = scan.table_name.schema() {
362                    table_parts
363                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364                }
365                table_parts.push(
366                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367                );
368                builder.name(ast::ObjectName::from(table_parts));
369                relation.table(builder);
370
371                Ok(())
372            }
373            LogicalPlan::Projection(p) => {
374                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375                    return self
376                        .select_to_sql_recursively(&new_plan, query, select, relation);
377                }
378
379                // Projection can be top-level plan for unnest relation
380                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
381                // only one expression, which is the placeholder column generated by the rewriter.
382                let unnest_input_type = if p.expr.len() == 1 {
383                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384                } else {
385                    None
386                };
387                if self.dialect.unnest_as_table_factor()
388                    && unnest_input_type.is_some()
389                    && let LogicalPlan::Unnest(unnest) = &p.input.as_ref()
390                    && let Some(unnest_relation) =
391                        self.try_unnest_to_table_factor_sql(unnest)?
392                {
393                    relation.unnest(unnest_relation);
394                    return self.select_to_sql_recursively(
395                        p.input.as_ref(),
396                        query,
397                        select,
398                        relation,
399                    );
400                }
401
402                // If it's a unnest projection, we should provide the table column alias
403                // to provide a column name for the unnest relation.
404                let columns = if unnest_input_type.is_some() {
405                    p.expr
406                        .iter()
407                        .map(|e| {
408                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
409                        })
410                        .collect()
411                } else {
412                    vec![]
413                };
414                // Projection can be top-level plan for derived table
415                if select.already_projected() {
416                    return self.derive_with_dialect_alias(
417                        "derived_projection",
418                        plan,
419                        relation,
420                        unnest_input_type
421                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
422                            .is_some(),
423                        columns,
424                    );
425                }
426                self.reconstruct_select_statement(plan, p, select)?;
427                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
428            }
429            LogicalPlan::Filter(filter) => {
430                if let Some(agg) =
431                    find_agg_node_within_select(plan, select.already_projected())
432                {
433                    let unprojected =
434                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
435                    let filter_expr = self.expr_to_sql(&unprojected)?;
436                    select.having(Some(filter_expr));
437                } else if let (Some(window), true) = (
438                    find_window_nodes_within_select(
439                        plan,
440                        None,
441                        select.already_projected(),
442                    ),
443                    self.dialect.supports_qualify(),
444                ) {
445                    let unprojected =
446                        unproject_window_exprs(filter.predicate.clone(), &window)?;
447                    let filter_expr = self.expr_to_sql(&unprojected)?;
448                    select.qualify(Some(filter_expr));
449                } else {
450                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
451                    select.selection(Some(filter_expr));
452                }
453
454                self.select_to_sql_recursively(
455                    filter.input.as_ref(),
456                    query,
457                    select,
458                    relation,
459                )
460            }
461            LogicalPlan::Limit(limit) => {
462                // Limit can be top-level plan for derived table
463                if select.already_projected() {
464                    return self.derive_with_dialect_alias(
465                        "derived_limit",
466                        plan,
467                        relation,
468                        false,
469                        vec![],
470                    );
471                }
472                if let Some(fetch) = &limit.fetch {
473                    let Some(query) = query.as_mut() else {
474                        return internal_err!(
475                            "Limit operator only valid in a statement context."
476                        );
477                    };
478                    query.limit(Some(self.expr_to_sql(fetch)?));
479                }
480
481                if let Some(skip) = &limit.skip {
482                    let Some(query) = query.as_mut() else {
483                        return internal_err!(
484                            "Offset operator only valid in a statement context."
485                        );
486                    };
487
488                    query.offset(Some(ast::Offset {
489                        rows: ast::OffsetRows::None,
490                        value: self.expr_to_sql(skip)?,
491                    }));
492                }
493
494                self.select_to_sql_recursively(
495                    limit.input.as_ref(),
496                    query,
497                    select,
498                    relation,
499                )
500            }
501            LogicalPlan::Sort(sort) => {
502                let Some(query_ref) = query else {
503                    return internal_err!(
504                        "Sort operator only valid in a statement context."
505                    );
506                };
507
508                if let Some(fetch) = sort.fetch {
509                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
510                        fetch.to_string(),
511                        false,
512                    ))));
513                };
514
515                let agg = find_agg_node_within_select(plan, select.already_projected());
516                // unproject sort expressions
517                let sort_exprs: Vec<SortExpr> = sort
518                    .expr
519                    .iter()
520                    .map(|sort_expr| {
521                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
522                    })
523                    .collect::<Result<Vec<_>>>()?;
524
525                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
526
527                self.select_to_sql_recursively(
528                    sort.input.as_ref(),
529                    query,
530                    select,
531                    relation,
532                )
533            }
534            LogicalPlan::Aggregate(agg) => {
535                // Aggregation can be already handled in the projection case
536                if !select.already_projected() {
537                    // The query returns aggregate and group expressions. If that weren't the case,
538                    // the aggregate would have been placed inside a projection, making the check above^ false
539                    let exprs: Vec<_> = agg
540                        .aggr_expr
541                        .iter()
542                        .chain(agg.group_expr.iter())
543                        .map(|expr| self.select_item_to_sql(expr))
544                        .collect::<Result<Vec<_>>>()?;
545                    select.projection(exprs);
546
547                    select.group_by(ast::GroupByExpr::Expressions(
548                        agg.group_expr
549                            .iter()
550                            .map(|expr| self.expr_to_sql(expr))
551                            .collect::<Result<Vec<_>>>()?,
552                        vec![],
553                    ));
554                }
555
556                self.select_to_sql_recursively(
557                    agg.input.as_ref(),
558                    query,
559                    select,
560                    relation,
561                )
562            }
563            LogicalPlan::Distinct(distinct) => {
564                // Distinct can be top-level plan for derived table
565                if select.already_projected() {
566                    return self.derive_with_dialect_alias(
567                        "derived_distinct",
568                        plan,
569                        relation,
570                        false,
571                        vec![],
572                    );
573                }
574
575                // If this distinct is the parent of a Union and we're in a query context,
576                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
577                if let Distinct::All(input) = distinct
578                    && matches!(input.as_ref(), LogicalPlan::Union(_))
579                    && let Some(query_mut) = query.as_mut()
580                {
581                    query_mut.distinct_union();
582                    return self.select_to_sql_recursively(
583                        input.as_ref(),
584                        query,
585                        select,
586                        relation,
587                    );
588                }
589
590                let (select_distinct, input) = match distinct {
591                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
592                    Distinct::On(on) => {
593                        let exprs = on
594                            .on_expr
595                            .iter()
596                            .map(|e| self.expr_to_sql(e))
597                            .collect::<Result<Vec<_>>>()?;
598                        let items = on
599                            .select_expr
600                            .iter()
601                            .map(|e| self.select_item_to_sql(e))
602                            .collect::<Result<Vec<_>>>()?;
603                        if let Some(sort_expr) = &on.sort_expr {
604                            if let Some(query_ref) = query {
605                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
606                            } else {
607                                return internal_err!(
608                                    "Sort operator only valid in a statement context."
609                                );
610                            }
611                        }
612                        select.projection(items);
613                        (ast::Distinct::On(exprs), on.input.as_ref())
614                    }
615                };
616                select.distinct(Some(select_distinct));
617                self.select_to_sql_recursively(input, query, select, relation)
618            }
619            LogicalPlan::Join(join) => {
620                let mut table_scan_filters = vec![];
621                let (left_plan, right_plan) = match join.join_type {
622                    JoinType::RightSemi | JoinType::RightAnti => {
623                        (&join.right, &join.left)
624                    }
625                    _ => (&join.left, &join.right),
626                };
627                // If there's an outer projection plan, it will already set up the projection.
628                // In that case, we don't need to worry about setting up the projection here.
629                // The outer projection plan will handle projecting the correct columns.
630                let already_projected = select.already_projected();
631
632                let left_plan =
633                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
634                        Some((plan, filters)) => {
635                            table_scan_filters.extend(filters);
636                            Arc::new(plan)
637                        }
638                        None => Arc::clone(left_plan),
639                    };
640
641                self.select_to_sql_recursively(
642                    left_plan.as_ref(),
643                    query,
644                    select,
645                    relation,
646                )?;
647
648                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
649                {
650                    Some(select.pop_projections())
651                } else {
652                    None
653                };
654
655                let right_plan =
656                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
657                        Some((plan, filters)) => {
658                            table_scan_filters.extend(filters);
659                            Arc::new(plan)
660                        }
661                        None => Arc::clone(right_plan),
662                    };
663
664                let mut right_relation = RelationBuilder::default();
665
666                self.select_to_sql_recursively(
667                    right_plan.as_ref(),
668                    query,
669                    select,
670                    &mut right_relation,
671                )?;
672
673                let join_filters = if table_scan_filters.is_empty() {
674                    join.filter.clone()
675                } else {
676                    // Combine `table_scan_filters` into a single filter using `AND`
677                    let Some(combined_filters) =
678                        table_scan_filters.into_iter().reduce(|acc, filter| {
679                            Expr::BinaryExpr(BinaryExpr {
680                                left: Box::new(acc),
681                                op: Operator::And,
682                                right: Box::new(filter),
683                            })
684                        })
685                    else {
686                        return internal_err!("Failed to combine TableScan filters");
687                    };
688
689                    // Combine `join.filter` with `combined_filters` using `AND`
690                    match &join.filter {
691                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
692                            left: Box::new(filter.clone()),
693                            op: Operator::And,
694                            right: Box::new(combined_filters),
695                        })),
696                        None => Some(combined_filters),
697                    }
698                };
699
700                let join_constraint = self.join_constraint_to_sql(
701                    join.join_constraint,
702                    &join.on,
703                    join_filters.as_ref(),
704                )?;
705
706                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
707                {
708                    Some(select.pop_projections())
709                } else {
710                    None
711                };
712
713                match join.join_type {
714                    JoinType::LeftSemi
715                    | JoinType::LeftAnti
716                    | JoinType::LeftMark
717                    | JoinType::RightSemi
718                    | JoinType::RightAnti
719                    | JoinType::RightMark => {
720                        let mut query_builder = QueryBuilder::default();
721                        let mut from = TableWithJoinsBuilder::default();
722                        let mut exists_select: SelectBuilder = SelectBuilder::default();
723                        from.relation(right_relation);
724                        exists_select.push_from(from);
725                        if let Some(filter) = &join.filter {
726                            exists_select.selection(Some(self.expr_to_sql(filter)?));
727                        }
728                        for (left, right) in &join.on {
729                            exists_select.selection(Some(
730                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
731                            ));
732                        }
733                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
734                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
735                        )]);
736                        query_builder.body(Box::new(SetExpr::Select(Box::new(
737                            exists_select.build()?,
738                        ))));
739
740                        let negated = match join.join_type {
741                            JoinType::LeftSemi
742                            | JoinType::RightSemi
743                            | JoinType::LeftMark
744                            | JoinType::RightMark => false,
745                            JoinType::LeftAnti | JoinType::RightAnti => true,
746                            _ => unreachable!(),
747                        };
748                        let exists_expr = ast::Expr::Exists {
749                            subquery: Box::new(query_builder.build()?),
750                            negated,
751                        };
752
753                        match join.join_type {
754                            JoinType::LeftMark | JoinType::RightMark => {
755                                let source_schema =
756                                    if join.join_type == JoinType::LeftMark {
757                                        right_plan.schema()
758                                    } else {
759                                        left_plan.schema()
760                                    };
761                                let (table_ref, _) = source_schema.qualified_field(0);
762                                let column = self.col_to_sql(&Column::new(
763                                    table_ref.cloned(),
764                                    "mark",
765                                ))?;
766                                select.replace_mark(&column, &exists_expr);
767                            }
768                            _ => {
769                                select.selection(Some(exists_expr));
770                            }
771                        }
772                        if let Some(projection) = left_projection {
773                            select.projection(projection);
774                        }
775                    }
776                    JoinType::Inner
777                    | JoinType::Left
778                    | JoinType::Right
779                    | JoinType::Full => {
780                        let Ok(Some(relation)) = right_relation.build() else {
781                            return internal_err!("Failed to build right relation");
782                        };
783                        let ast_join = ast::Join {
784                            relation,
785                            global: false,
786                            join_operator: self
787                                .join_operator_to_sql(join.join_type, join_constraint)?,
788                        };
789                        let mut from = select.pop_from().unwrap();
790                        from.push_join(ast_join);
791                        select.push_from(from);
792                        if !already_projected {
793                            let Some(left_projection) = left_projection else {
794                                return internal_err!("Left projection is missing");
795                            };
796
797                            let Some(right_projection) = right_projection else {
798                                return internal_err!("Right projection is missing");
799                            };
800
801                            let projection = left_projection
802                                .into_iter()
803                                .chain(right_projection)
804                                .collect();
805                            select.projection(projection);
806                        }
807                    }
808                };
809
810                Ok(())
811            }
812            LogicalPlan::SubqueryAlias(plan_alias) => {
813                let (plan, mut columns) =
814                    subquery_alias_inner_query_and_columns(plan_alias);
815                let unparsed_table_scan = self.unparse_table_scan_pushdown(
816                    plan,
817                    Some(plan_alias.alias.clone()),
818                    select.already_projected(),
819                )?;
820                // if the child plan is a TableScan with pushdown operations, we don't need to
821                // create an additional subquery for it
822                if !select.already_projected() && unparsed_table_scan.is_none() {
823                    select.projection(vec![ast::SelectItem::Wildcard(
824                        ast::WildcardAdditionalOptions::default(),
825                    )]);
826                }
827                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
828                if !columns.is_empty()
829                    && !self.dialect.supports_column_alias_in_table_alias()
830                {
831                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
832                    let rewritten_plan =
833                        match inject_column_aliases_into_subquery(plan, columns) {
834                            Ok(p) => p,
835                            Err(e) => {
836                                return internal_err!(
837                                    "Failed to transform SubqueryAlias plan: {e}"
838                                );
839                            }
840                        };
841
842                    columns = vec![];
843
844                    self.select_to_sql_recursively(
845                        &rewritten_plan,
846                        query,
847                        select,
848                        relation,
849                    )?;
850                } else {
851                    self.select_to_sql_recursively(&plan, query, select, relation)?;
852                }
853
854                relation.alias(Some(
855                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
856                ));
857
858                Ok(())
859            }
860            LogicalPlan::Union(union) => {
861                // Covers cases where the UNION is a subquery and the projection is at the top level
862                if select.already_projected() {
863                    return self.derive_with_dialect_alias(
864                        "derived_union",
865                        plan,
866                        relation,
867                        false,
868                        vec![],
869                    );
870                }
871
872                let input_exprs: Vec<SetExpr> = union
873                    .inputs
874                    .iter()
875                    .map(|input| self.select_to_sql_expr(input, query))
876                    .collect::<Result<Vec<_>>>()?;
877
878                assert_or_internal_err!(
879                    input_exprs.len() >= 2,
880                    "UNION operator requires at least 2 inputs"
881                );
882
883                let set_quantifier =
884                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
885                        // Setting the SetQuantifier to None will unparse as a `UNION`
886                        // rather than a `UNION ALL`.
887                        ast::SetQuantifier::None
888                    } else {
889                        ast::SetQuantifier::All
890                    };
891
892                // Build the union expression tree bottom-up by reversing the order
893                // note that we are also swapping left and right inputs because of the rev
894                let union_expr = input_exprs
895                    .into_iter()
896                    .rev()
897                    .reduce(|a, b| SetExpr::SetOperation {
898                        op: ast::SetOperator::Union,
899                        set_quantifier,
900                        left: Box::new(b),
901                        right: Box::new(a),
902                    })
903                    .unwrap();
904
905                let Some(query) = query.as_mut() else {
906                    return internal_err!(
907                        "UNION ALL operator only valid in a statement context"
908                    );
909                };
910                query.body(Box::new(union_expr));
911
912                Ok(())
913            }
914            LogicalPlan::Window(window) => {
915                // Window nodes are handled simultaneously with Projection nodes
916                self.select_to_sql_recursively(
917                    window.input.as_ref(),
918                    query,
919                    select,
920                    relation,
921                )
922            }
923            LogicalPlan::EmptyRelation(_) => {
924                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
925                // a TableRelationBuilder will be created for the UNNEST node first.
926                if !relation.has_relation() {
927                    relation.empty();
928                }
929                Ok(())
930            }
931            LogicalPlan::Extension(extension) => {
932                if let Some(query) = query.as_mut() {
933                    self.extension_to_sql(
934                        extension.node.as_ref(),
935                        &mut Some(query),
936                        &mut Some(select),
937                        &mut Some(relation),
938                    )
939                } else {
940                    self.extension_to_sql(
941                        extension.node.as_ref(),
942                        &mut None,
943                        &mut Some(select),
944                        &mut Some(relation),
945                    )
946                }
947            }
948            LogicalPlan::Unnest(unnest) => {
949                assert_or_internal_err!(
950                    unnest.struct_type_columns.is_empty(),
951                    "Struct type columns are not currently supported in UNNEST: {:?}",
952                    unnest.struct_type_columns
953                );
954
955                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
956                // Otherwise, there will be a duplicate SELECT clause.
957                // | Projection: table.col1, UNNEST(table.col2)
958                // |   Unnest: UNNEST(table.col2)
959                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
960                // |       Filter: table.col3 = Int64(3)
961                // |         TableScan: table projection=None
962                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
963                    // continue with projection input
964                    self.select_to_sql_recursively(&p.input, query, select, relation)
965                } else {
966                    internal_err!("Unnest input is not a Projection: {unnest:?}")
967                }
968            }
969            LogicalPlan::Subquery(subquery)
970                if find_unnest_node_until_relation(subquery.subquery.as_ref())
971                    .is_some() =>
972            {
973                if self.dialect.unnest_as_table_factor() {
974                    self.select_to_sql_recursively(
975                        subquery.subquery.as_ref(),
976                        query,
977                        select,
978                        relation,
979                    )
980                } else {
981                    self.derive_with_dialect_alias(
982                        "derived_unnest",
983                        subquery.subquery.as_ref(),
984                        relation,
985                        true,
986                        vec![],
987                    )
988                }
989            }
990            _ => {
991                not_impl_err!("Unsupported operator: {plan:?}")
992            }
993        }
994    }
995
996    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
997    ///
998    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
999    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
1000    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
1001    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
1002    /// - If the column is not a placeholder column, return [None].
1003    ///
1004    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1005    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1006        if let Expr::Alias(Alias { expr, .. }) = expr
1007            && let Expr::Column(Column { name, .. }) = expr.as_ref()
1008            && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1009        {
1010            if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1011                return Some(UnnestInputType::OuterReference);
1012            }
1013            return Some(UnnestInputType::Scalar);
1014        }
1015        None
1016    }
1017
1018    fn try_unnest_to_table_factor_sql(
1019        &self,
1020        unnest: &Unnest,
1021    ) -> Result<Option<UnnestRelationBuilder>> {
1022        let mut unnest_relation = UnnestRelationBuilder::default();
1023        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1024            return Ok(None);
1025        };
1026
1027        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1028            // It may be possible that UNNEST is used as a source for the query.
1029            // However, at this point, we don't yet know if it is just a single expression
1030            // from another source or if it's from UNNEST.
1031            //
1032            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1033            // which is normally safe to unnest as a table factor.
1034            // However, in the future, more comprehensive checks can be added here.
1035            return Ok(None);
1036        };
1037
1038        let exprs = projection
1039            .expr
1040            .iter()
1041            .map(|e| self.expr_to_sql(e))
1042            .collect::<Result<Vec<_>>>()?;
1043        unnest_relation.array_exprs(exprs);
1044
1045        Ok(Some(unnest_relation))
1046    }
1047
1048    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1049        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1050    }
1051
1052    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1053    /// If the table scan is without any pushdown operations, return None.
1054    fn unparse_table_scan_pushdown(
1055        &self,
1056        plan: &LogicalPlan,
1057        alias: Option<TableReference>,
1058        already_projected: bool,
1059    ) -> Result<Option<LogicalPlan>> {
1060        match plan {
1061            LogicalPlan::TableScan(table_scan) => {
1062                if !Self::is_scan_with_pushdown(table_scan) {
1063                    return Ok(None);
1064                }
1065                let table_schema = table_scan.source.schema();
1066                let mut filter_alias_rewriter =
1067                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1068                        table_schema: &table_schema,
1069                        alias_name: alias_name.clone(),
1070                    });
1071
1072                let mut builder = LogicalPlanBuilder::scan(
1073                    table_scan.table_name.clone(),
1074                    Arc::clone(&table_scan.source),
1075                    None,
1076                )?;
1077                // We will rebase the column references to the new alias if it exists.
1078                // If the projection or filters are empty, we will append alias to the table scan.
1079                //
1080                // Example:
1081                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1082                if let Some(ref alias) = alias
1083                    && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1084                {
1085                    builder = builder.alias(alias.clone())?;
1086                }
1087
1088                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1089                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1090                // information included in the TableScan node.
1091                if !already_projected && let Some(project_vec) = &table_scan.projection {
1092                    if project_vec.is_empty() {
1093                        builder = builder.project(self.empty_projection_fallback())?;
1094                    } else {
1095                        let project_columns = project_vec
1096                            .iter()
1097                            .cloned()
1098                            .map(|i| {
1099                                let schema = table_scan.source.schema();
1100                                let field = schema.field(i);
1101                                if alias.is_some() {
1102                                    Column::new(alias.clone(), field.name().clone())
1103                                } else {
1104                                    Column::new(
1105                                        Some(table_scan.table_name.clone()),
1106                                        field.name().clone(),
1107                                    )
1108                                }
1109                            })
1110                            .collect::<Vec<_>>();
1111                        builder = builder.project(project_columns)?;
1112                    };
1113                }
1114
1115                let filter_expr: Result<Option<Expr>> = table_scan
1116                    .filters
1117                    .iter()
1118                    .cloned()
1119                    .map(|expr| {
1120                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1121                            expr.rewrite(rewriter).data()
1122                        } else {
1123                            Ok(expr)
1124                        }
1125                    })
1126                    .reduce(|acc, expr_result| {
1127                        acc.and_then(|acc_expr| {
1128                            expr_result.map(|expr| acc_expr.and(expr))
1129                        })
1130                    })
1131                    .transpose();
1132
1133                if let Some(filter) = filter_expr? {
1134                    builder = builder.filter(filter)?;
1135                }
1136
1137                if let Some(fetch) = table_scan.fetch {
1138                    builder = builder.limit(0, Some(fetch))?;
1139                }
1140
1141                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1142                // So we will append the alias to this subquery.
1143                // Example:
1144                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1145                if let Some(alias) = alias
1146                    && table_scan.projection.is_none()
1147                    && table_scan.filters.is_empty()
1148                {
1149                    builder = builder.alias(alias)?;
1150                }
1151
1152                Ok(Some(builder.build()?))
1153            }
1154            LogicalPlan::SubqueryAlias(subquery_alias) => {
1155                let ret = self.unparse_table_scan_pushdown(
1156                    &subquery_alias.input,
1157                    Some(subquery_alias.alias.clone()),
1158                    already_projected,
1159                )?;
1160                if let Some(alias) = alias
1161                    && let Some(plan) = ret
1162                {
1163                    let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1164                    return Ok(Some(plan));
1165                }
1166                Ok(ret)
1167            }
1168            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1169            // The inner table scan could be a scan with pushdown operations.
1170            LogicalPlan::Projection(projection) => {
1171                if let Some(plan) = self.unparse_table_scan_pushdown(
1172                    &projection.input,
1173                    alias.clone(),
1174                    already_projected,
1175                )? {
1176                    let exprs = if alias.is_some() {
1177                        let mut alias_rewriter =
1178                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1179                                table_schema: plan.schema().as_arrow(),
1180                                alias_name: alias_name.clone(),
1181                            });
1182                        projection
1183                            .expr
1184                            .iter()
1185                            .cloned()
1186                            .map(|expr| {
1187                                if let Some(ref mut rewriter) = alias_rewriter {
1188                                    expr.rewrite(rewriter).data()
1189                                } else {
1190                                    Ok(expr)
1191                                }
1192                            })
1193                            .collect::<Result<Vec<_>>>()?
1194                    } else {
1195                        projection.expr.clone()
1196                    };
1197                    Ok(Some(
1198                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1199                    ))
1200                } else {
1201                    Ok(None)
1202                }
1203            }
1204            _ => Ok(None),
1205        }
1206    }
1207
1208    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1209        match expr {
1210            Expr::Alias(Alias { expr, name, .. }) => {
1211                let inner = self.expr_to_sql(expr)?;
1212
1213                // Determine the alias name to use
1214                let col_name = if let Some(rewritten_name) =
1215                    self.dialect.col_alias_overrides(name)?
1216                {
1217                    rewritten_name.to_string()
1218                } else {
1219                    name.to_string()
1220                };
1221
1222                Ok(ast::SelectItem::ExprWithAlias {
1223                    expr: inner,
1224                    alias: self.new_ident_quoted_if_needs(col_name),
1225                })
1226            }
1227            _ => {
1228                let inner = self.expr_to_sql(expr)?;
1229
1230                Ok(ast::SelectItem::UnnamedExpr(inner))
1231            }
1232        }
1233    }
1234
1235    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1236        Ok(OrderByKind::Expressions(
1237            sort_exprs
1238                .iter()
1239                .map(|sort_expr| self.sort_to_sql(sort_expr))
1240                .collect::<Result<Vec<_>>>()?,
1241        ))
1242    }
1243
1244    fn join_operator_to_sql(
1245        &self,
1246        join_type: JoinType,
1247        constraint: ast::JoinConstraint,
1248    ) -> Result<ast::JoinOperator> {
1249        Ok(match join_type {
1250            JoinType::Inner => match &constraint {
1251                ast::JoinConstraint::On(_)
1252                | ast::JoinConstraint::Using(_)
1253                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1254                ast::JoinConstraint::None => {
1255                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1256                    // return a CROSS JOIN instead
1257                    ast::JoinOperator::CrossJoin(constraint)
1258                }
1259            },
1260            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1261            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1262            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1263            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1264            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1265            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1266            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1267            JoinType::LeftMark | JoinType::RightMark => {
1268                unimplemented!("Unparsing of Mark join type")
1269            }
1270        })
1271    }
1272
1273    /// Convert the components of a USING clause to the USING AST. Returns
1274    /// 'None' if the conditions are not compatible with a USING expression,
1275    /// e.g. non-column expressions or non-matching names.
1276    fn join_using_to_sql(
1277        &self,
1278        join_conditions: &[(Expr, Expr)],
1279    ) -> Option<ast::JoinConstraint> {
1280        let mut object_names = Vec::with_capacity(join_conditions.len());
1281        for (left, right) in join_conditions {
1282            match (left, right) {
1283                (
1284                    Expr::Column(Column {
1285                        relation: _,
1286                        name: left_name,
1287                        spans: _,
1288                    }),
1289                    Expr::Column(Column {
1290                        relation: _,
1291                        name: right_name,
1292                        spans: _,
1293                    }),
1294                ) if left_name == right_name => {
1295                    // For example, if the join condition `t1.id = t2.id`
1296                    // this is represented as two columns like `[t1.id, t2.id]`
1297                    // This code forms `id` (without relation name)
1298                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1299                    object_names.push(ast::ObjectName::from(vec![ident]));
1300                }
1301                // USING is only valid with matching column names; arbitrary expressions
1302                // are not allowed
1303                _ => return None,
1304            }
1305        }
1306        Some(ast::JoinConstraint::Using(object_names))
1307    }
1308
1309    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1310    fn join_constraint_to_sql(
1311        &self,
1312        constraint: JoinConstraint,
1313        conditions: &[(Expr, Expr)],
1314        filter: Option<&Expr>,
1315    ) -> Result<ast::JoinConstraint> {
1316        match (constraint, conditions, filter) {
1317            // No constraints
1318            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1319                Ok(ast::JoinConstraint::None)
1320            }
1321
1322            (JoinConstraint::Using, conditions, None) => {
1323                match self.join_using_to_sql(conditions) {
1324                    Some(using) => Ok(using),
1325                    // As above, this should not be reachable from parsed SQL,
1326                    // but a user could create this; we "downgrade" to ON.
1327                    None => self.join_conditions_to_sql_on(conditions, None),
1328                }
1329            }
1330
1331            // Two cases here:
1332            // 1. Straightforward ON case, with possible equi-join conditions
1333            //    and additional filters
1334            // 2. USING with additional filters; we "downgrade" to ON, because
1335            //    you can't use USING with arbitrary filters. (This should not
1336            //    be accessible from parsed SQL, but may have been a
1337            //    custom-built JOIN by a user.)
1338            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1339                self.join_conditions_to_sql_on(conditions, filter)
1340            }
1341        }
1342    }
1343
1344    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1345    // AST node, with the equi-join conditions and the filter merged into a
1346    // single conditional expression
1347    fn join_conditions_to_sql_on(
1348        &self,
1349        join_conditions: &[(Expr, Expr)],
1350        filter: Option<&Expr>,
1351    ) -> Result<ast::JoinConstraint> {
1352        let mut condition = None;
1353        // AND the join conditions together to create the overall condition
1354        for (left, right) in join_conditions {
1355            // Parse left and right
1356            let l = self.expr_to_sql(left)?;
1357            let r = self.expr_to_sql(right)?;
1358            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1359            condition = match condition {
1360                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1361                None => Some(e),
1362            };
1363        }
1364
1365        // Then AND the non-equijoin filter condition as well
1366        condition = match (condition, filter) {
1367            (Some(expr), Some(filter)) => {
1368                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1369            }
1370            (Some(expr), None) => Some(expr),
1371            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1372            (None, None) => None,
1373        };
1374
1375        let constraint = match condition {
1376            Some(filter) => ast::JoinConstraint::On(filter),
1377            None => ast::JoinConstraint::None,
1378        };
1379
1380        Ok(constraint)
1381    }
1382
1383    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1384        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1385    }
1386
1387    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1388        let columns = columns
1389            .into_iter()
1390            .map(|ident| TableAliasColumnDef {
1391                name: ident,
1392                data_type: None,
1393            })
1394            .collect();
1395        ast::TableAlias {
1396            name: self.new_ident_quoted_if_needs(alias),
1397            columns,
1398        }
1399    }
1400
1401    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1402        not_impl_err!("Unsupported plan: {plan:?}")
1403    }
1404
1405    /// Generates appropriate projection expression for empty projection lists.
1406    /// Returns an empty vec for dialects supporting empty select lists,
1407    /// or a dummy literal `1` for other dialects.
1408    fn empty_projection_fallback(&self) -> Vec<Expr> {
1409        if self.dialect.supports_empty_select_list() {
1410            Vec::new()
1411        } else {
1412            vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
1413        }
1414    }
1415}
1416
1417impl From<BuilderError> for DataFusionError {
1418    fn from(e: BuilderError) -> Self {
1419        DataFusionError::External(Box::new(e))
1420    }
1421}
1422
1423/// The type of the input to the UNNEST table factor.
1424#[derive(Debug)]
1425enum UnnestInputType {
1426    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1427    OuterReference,
1428    /// The input is a scalar value. It will be presented like a scalar array or struct.
1429    Scalar,
1430}