Skip to main content

datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    Unparser,
20    ast::{
21        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23    },
24    rewrite::{
25        TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26        rewrite_plan_for_sort_on_non_projected_fields,
27        subquery_alias_inner_query_and_columns,
28    },
29    utils::{
30        find_agg_node_within_select, find_unnest_node_within_select,
31        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
33    },
34};
35use crate::unparser::extension_unparser::{
36    UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
43    internal_err, not_impl_err,
44    tree_node::{TransformedResult, TreeNode},
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode, expr::Alias,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// // convert to AST
85/// let sql = plan_to_sql(&plan).unwrap();
86/// // use the Display impl to convert to SQL text
87/// assert_eq!(
88///     sql.to_string(),
89///     "SELECT \"table\".id, \"table\".\"value\" FROM \"table\""
90/// )
91/// ```
92///
93/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
94/// [`expr_to_sql`]: crate::unparser::expr_to_sql
95pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96    let unparser = Unparser::default();
97    unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102        let mut plan = normalize_union_schema(plan)?;
103        if !self.dialect.supports_qualify() {
104            plan = rewrite_qualify(plan)?;
105        }
106
107        match plan {
108            LogicalPlan::Projection(_)
109            | LogicalPlan::Filter(_)
110            | LogicalPlan::Window(_)
111            | LogicalPlan::Aggregate(_)
112            | LogicalPlan::Sort(_)
113            | LogicalPlan::Join(_)
114            | LogicalPlan::Repartition(_)
115            | LogicalPlan::Union(_)
116            | LogicalPlan::TableScan(_)
117            | LogicalPlan::EmptyRelation(_)
118            | LogicalPlan::Subquery(_)
119            | LogicalPlan::SubqueryAlias(_)
120            | LogicalPlan::Limit(_)
121            | LogicalPlan::Statement(_)
122            | LogicalPlan::Values(_)
123            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125            LogicalPlan::Extension(extension) => {
126                self.extension_to_statement(extension.node.as_ref())
127            }
128            LogicalPlan::Explain(_)
129            | LogicalPlan::Analyze(_)
130            | LogicalPlan::Ddl(_)
131            | LogicalPlan::Copy(_)
132            | LogicalPlan::DescribeTable(_)
133            | LogicalPlan::RecursiveQuery(_)
134            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135        }
136    }
137
138    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
139    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
140    /// the first unparsing result will be returned.
141    fn extension_to_statement(
142        &self,
143        node: &dyn UserDefinedLogicalNode,
144    ) -> Result<ast::Statement> {
145        let mut statement = None;
146        for unparser in &self.extension_unparsers {
147            match unparser.unparse_to_statement(node, self)? {
148                UnparseToStatementResult::Modified(stmt) => {
149                    statement = Some(stmt);
150                    break;
151                }
152                UnparseToStatementResult::Unmodified => {}
153            }
154        }
155        if let Some(statement) = statement {
156            Ok(statement)
157        } else {
158            not_impl_err!("Unsupported extension node: {node:?}")
159        }
160    }
161
162    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
163    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
164    /// the first unparser supporting the node will be used.
165    fn extension_to_sql(
166        &self,
167        node: &dyn UserDefinedLogicalNode,
168        query: &mut Option<&mut QueryBuilder>,
169        select: &mut Option<&mut SelectBuilder>,
170        relation: &mut Option<&mut RelationBuilder>,
171    ) -> Result<()> {
172        for unparser in &self.extension_unparsers {
173            match unparser.unparse(node, self, query, select, relation)? {
174                UnparseWithinStatementResult::Modified => return Ok(()),
175                UnparseWithinStatementResult::Unmodified => {}
176            }
177        }
178        not_impl_err!("Unsupported extension node: {node:?}")
179    }
180
181    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182        let mut query_builder = Some(QueryBuilder::default());
183
184        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186        let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188        Ok(ast::Statement::Query(Box::new(query)))
189    }
190
191    fn select_to_sql_expr(
192        &self,
193        plan: &LogicalPlan,
194        query: &mut Option<QueryBuilder>,
195    ) -> Result<SetExpr> {
196        let mut select_builder = SelectBuilder::default();
197        select_builder.push_from(TableWithJoinsBuilder::default());
198        let mut relation_builder = RelationBuilder::default();
199        self.select_to_sql_recursively(
200            plan,
201            query,
202            &mut select_builder,
203            &mut relation_builder,
204        )?;
205
206        // If we were able to construct a full body (i.e. UNION ALL), return it
207        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208            return Ok(*body);
209        }
210
211        // If no projection is set, add a wildcard projection to the select
212        // which will be translated to `SELECT *` in the SQL statement
213        if !select_builder.already_projected() {
214            select_builder.projection(vec![ast::SelectItem::Wildcard(
215                ast::WildcardAdditionalOptions::default(),
216            )]);
217        }
218
219        let mut twj = select_builder.pop_from().unwrap();
220        twj.relation(relation_builder);
221        select_builder.push_from(twj);
222
223        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224    }
225
226    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
227    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
228    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
229    fn reconstruct_select_statement(
230        &self,
231        plan: &LogicalPlan,
232        p: &Projection,
233        select: &mut SelectBuilder,
234    ) -> Result<()> {
235        let mut exprs = p.expr.clone();
236
237        // If an Unnest node is found within the select, find and unproject the unnest column
238        if let Some(unnest) = find_unnest_node_within_select(plan) {
239            exprs = exprs
240                .into_iter()
241                .map(|e| unproject_unnest_expr(e, unnest))
242                .collect::<Result<Vec<_>>>()?;
243        };
244
245        match (
246            find_agg_node_within_select(plan, true),
247            find_window_nodes_within_select(plan, None, true),
248        ) {
249            (Some(agg), window) => {
250                let window_option = window.as_deref();
251                let items = exprs
252                    .into_iter()
253                    .map(|proj_expr| {
254                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255                        self.select_item_to_sql(&unproj)
256                    })
257                    .collect::<Result<Vec<_>>>()?;
258
259                select.projection(items);
260                select.group_by(ast::GroupByExpr::Expressions(
261                    agg.group_expr
262                        .iter()
263                        .map(|expr| self.expr_to_sql(expr))
264                        .collect::<Result<Vec<_>>>()?,
265                    vec![],
266                ));
267            }
268            (None, Some(window)) => {
269                let items = exprs
270                    .into_iter()
271                    .map(|proj_expr| {
272                        let unproj = unproject_window_exprs(proj_expr, &window)?;
273                        self.select_item_to_sql(&unproj)
274                    })
275                    .collect::<Result<Vec<_>>>()?;
276
277                select.projection(items);
278            }
279            _ => {
280                let items = exprs
281                    .iter()
282                    .map(|e| self.select_item_to_sql(e))
283                    .collect::<Result<Vec<_>>>()?;
284                select.projection(items);
285            }
286        }
287        Ok(())
288    }
289
290    fn derive(
291        &self,
292        plan: &LogicalPlan,
293        relation: &mut RelationBuilder,
294        alias: Option<ast::TableAlias>,
295        lateral: bool,
296    ) -> Result<()> {
297        let mut derived_builder = DerivedRelationBuilder::default();
298        derived_builder.lateral(lateral).alias(alias).subquery({
299            let inner_statement = self.plan_to_sql(plan)?;
300            if let ast::Statement::Query(inner_query) = inner_statement {
301                inner_query
302            } else {
303                return internal_err!(
304                    "Subquery must be a Query, but found {inner_statement:?}"
305                );
306            }
307        });
308        relation.derived(derived_builder);
309
310        Ok(())
311    }
312
313    fn derive_with_dialect_alias(
314        &self,
315        alias: &str,
316        plan: &LogicalPlan,
317        relation: &mut RelationBuilder,
318        lateral: bool,
319        columns: Vec<Ident>,
320    ) -> Result<()> {
321        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322            self.derive(
323                plan,
324                relation,
325                Some(self.new_table_alias(alias.to_string(), columns)),
326                lateral,
327            )
328        } else {
329            self.derive(plan, relation, None, lateral)
330        }
331    }
332
333    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334    fn select_to_sql_recursively(
335        &self,
336        plan: &LogicalPlan,
337        query: &mut Option<QueryBuilder>,
338        select: &mut SelectBuilder,
339        relation: &mut RelationBuilder,
340    ) -> Result<()> {
341        match plan {
342            LogicalPlan::TableScan(scan) => {
343                if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
344                    plan,
345                    None,
346                    select.already_projected(),
347                )? {
348                    return self.select_to_sql_recursively(
349                        &unparsed_table_scan,
350                        query,
351                        select,
352                        relation,
353                    );
354                }
355                let mut builder = TableRelationBuilder::default();
356                let mut table_parts = vec![];
357                if let Some(catalog_name) = scan.table_name.catalog() {
358                    table_parts
359                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360                }
361                if let Some(schema_name) = scan.table_name.schema() {
362                    table_parts
363                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364                }
365                table_parts.push(
366                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367                );
368                builder.name(ast::ObjectName::from(table_parts));
369                relation.table(builder);
370
371                Ok(())
372            }
373            LogicalPlan::Projection(p) => {
374                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375                    return self
376                        .select_to_sql_recursively(&new_plan, query, select, relation);
377                }
378
379                // Projection can be top-level plan for unnest relation
380                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
381                // only one expression, which is the placeholder column generated by the rewriter.
382                let unnest_input_type = if p.expr.len() == 1 {
383                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384                } else {
385                    None
386                };
387                if self.dialect.unnest_as_table_factor()
388                    && unnest_input_type.is_some()
389                    && let LogicalPlan::Unnest(unnest) = &p.input.as_ref()
390                    && let Some(unnest_relation) =
391                        self.try_unnest_to_table_factor_sql(unnest)?
392                {
393                    relation.unnest(unnest_relation);
394                    return self.select_to_sql_recursively(
395                        p.input.as_ref(),
396                        query,
397                        select,
398                        relation,
399                    );
400                }
401
402                // If it's a unnest projection, we should provide the table column alias
403                // to provide a column name for the unnest relation.
404                let columns = if unnest_input_type.is_some() {
405                    p.expr
406                        .iter()
407                        .map(|e| {
408                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
409                        })
410                        .collect()
411                } else {
412                    vec![]
413                };
414                // Projection can be top-level plan for derived table
415                if select.already_projected() {
416                    return self.derive_with_dialect_alias(
417                        "derived_projection",
418                        plan,
419                        relation,
420                        unnest_input_type
421                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
422                            .is_some(),
423                        columns,
424                    );
425                }
426                self.reconstruct_select_statement(plan, p, select)?;
427                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
428            }
429            LogicalPlan::Filter(filter) => {
430                if let Some(agg) =
431                    find_agg_node_within_select(plan, select.already_projected())
432                {
433                    let unprojected =
434                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
435                    let filter_expr = self.expr_to_sql(&unprojected)?;
436                    select.having(Some(filter_expr));
437                } else if let (Some(window), true) = (
438                    find_window_nodes_within_select(
439                        plan,
440                        None,
441                        select.already_projected(),
442                    ),
443                    self.dialect.supports_qualify(),
444                ) {
445                    let unprojected =
446                        unproject_window_exprs(filter.predicate.clone(), &window)?;
447                    let filter_expr = self.expr_to_sql(&unprojected)?;
448                    select.qualify(Some(filter_expr));
449                } else {
450                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
451                    select.selection(Some(filter_expr));
452                }
453
454                self.select_to_sql_recursively(
455                    filter.input.as_ref(),
456                    query,
457                    select,
458                    relation,
459                )
460            }
461            LogicalPlan::Limit(limit) => {
462                // Limit can be top-level plan for derived table
463                if select.already_projected() {
464                    return self.derive_with_dialect_alias(
465                        "derived_limit",
466                        plan,
467                        relation,
468                        false,
469                        vec![],
470                    );
471                }
472                if let Some(fetch) = &limit.fetch {
473                    let Some(query) = query.as_mut() else {
474                        return internal_err!(
475                            "Limit operator only valid in a statement context."
476                        );
477                    };
478                    query.limit(Some(self.expr_to_sql(fetch)?));
479                }
480
481                if let Some(skip) = &limit.skip {
482                    let Some(query) = query.as_mut() else {
483                        return internal_err!(
484                            "Offset operator only valid in a statement context."
485                        );
486                    };
487
488                    query.offset(Some(ast::Offset {
489                        rows: ast::OffsetRows::None,
490                        value: self.expr_to_sql(skip)?,
491                    }));
492                }
493
494                self.select_to_sql_recursively(
495                    limit.input.as_ref(),
496                    query,
497                    select,
498                    relation,
499                )
500            }
501            LogicalPlan::Sort(sort) => {
502                // Sort can be top-level plan for derived table
503                if select.already_projected() {
504                    return self.derive_with_dialect_alias(
505                        "derived_sort",
506                        plan,
507                        relation,
508                        false,
509                        vec![],
510                    );
511                }
512
513                let Some(query_ref) = query else {
514                    return internal_err!(
515                        "Sort operator only valid in a statement context."
516                    );
517                };
518
519                if let Some(fetch) = sort.fetch {
520                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
521                        fetch.to_string(),
522                        false,
523                    ))));
524                };
525
526                let agg = find_agg_node_within_select(plan, select.already_projected());
527                // unproject sort expressions
528                let sort_exprs: Vec<SortExpr> = sort
529                    .expr
530                    .iter()
531                    .map(|sort_expr| {
532                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
533                    })
534                    .collect::<Result<Vec<_>>>()?;
535
536                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
537
538                self.select_to_sql_recursively(
539                    sort.input.as_ref(),
540                    query,
541                    select,
542                    relation,
543                )
544            }
545            LogicalPlan::Aggregate(agg) => {
546                // Aggregation can be already handled in the projection case
547                if !select.already_projected() {
548                    // The query returns aggregate and group expressions. If that weren't the case,
549                    // the aggregate would have been placed inside a projection, making the check above^ false
550                    let exprs: Vec<_> = agg
551                        .aggr_expr
552                        .iter()
553                        .chain(agg.group_expr.iter())
554                        .map(|expr| self.select_item_to_sql(expr))
555                        .collect::<Result<Vec<_>>>()?;
556                    select.projection(exprs);
557
558                    select.group_by(ast::GroupByExpr::Expressions(
559                        agg.group_expr
560                            .iter()
561                            .map(|expr| self.expr_to_sql(expr))
562                            .collect::<Result<Vec<_>>>()?,
563                        vec![],
564                    ));
565                }
566
567                self.select_to_sql_recursively(
568                    agg.input.as_ref(),
569                    query,
570                    select,
571                    relation,
572                )
573            }
574            LogicalPlan::Distinct(distinct) => {
575                // Distinct can be top-level plan for derived table
576                if select.already_projected() {
577                    return self.derive_with_dialect_alias(
578                        "derived_distinct",
579                        plan,
580                        relation,
581                        false,
582                        vec![],
583                    );
584                }
585
586                // If this distinct is the parent of a Union and we're in a query context,
587                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
588                if let Distinct::All(input) = distinct
589                    && matches!(input.as_ref(), LogicalPlan::Union(_))
590                    && let Some(query_mut) = query.as_mut()
591                {
592                    query_mut.distinct_union();
593                    return self.select_to_sql_recursively(
594                        input.as_ref(),
595                        query,
596                        select,
597                        relation,
598                    );
599                }
600
601                let (select_distinct, input) = match distinct {
602                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
603                    Distinct::On(on) => {
604                        let exprs = on
605                            .on_expr
606                            .iter()
607                            .map(|e| self.expr_to_sql(e))
608                            .collect::<Result<Vec<_>>>()?;
609                        let items = on
610                            .select_expr
611                            .iter()
612                            .map(|e| self.select_item_to_sql(e))
613                            .collect::<Result<Vec<_>>>()?;
614                        if let Some(sort_expr) = &on.sort_expr {
615                            if let Some(query_ref) = query {
616                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
617                            } else {
618                                return internal_err!(
619                                    "Sort operator only valid in a statement context."
620                                );
621                            }
622                        }
623                        select.projection(items);
624                        (ast::Distinct::On(exprs), on.input.as_ref())
625                    }
626                };
627                select.distinct(Some(select_distinct));
628                self.select_to_sql_recursively(input, query, select, relation)
629            }
630            LogicalPlan::Join(join) => {
631                let mut table_scan_filters = vec![];
632                let (left_plan, right_plan) = match join.join_type {
633                    JoinType::RightSemi | JoinType::RightAnti => {
634                        (&join.right, &join.left)
635                    }
636                    _ => (&join.left, &join.right),
637                };
638                // If there's an outer projection plan, it will already set up the projection.
639                // In that case, we don't need to worry about setting up the projection here.
640                // The outer projection plan will handle projecting the correct columns.
641                let already_projected = select.already_projected();
642
643                let left_plan =
644                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
645                        Some((plan, filters)) => {
646                            table_scan_filters.extend(filters);
647                            Arc::new(plan)
648                        }
649                        None => Arc::clone(left_plan),
650                    };
651
652                self.select_to_sql_recursively(
653                    left_plan.as_ref(),
654                    query,
655                    select,
656                    relation,
657                )?;
658
659                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
660                {
661                    Some(select.pop_projections())
662                } else {
663                    None
664                };
665
666                let right_plan =
667                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
668                        Some((plan, filters)) => {
669                            table_scan_filters.extend(filters);
670                            Arc::new(plan)
671                        }
672                        None => Arc::clone(right_plan),
673                    };
674
675                let mut right_relation = RelationBuilder::default();
676
677                self.select_to_sql_recursively(
678                    right_plan.as_ref(),
679                    query,
680                    select,
681                    &mut right_relation,
682                )?;
683
684                let join_filters = if table_scan_filters.is_empty() {
685                    join.filter.clone()
686                } else {
687                    // Combine `table_scan_filters` into a single filter using `AND`
688                    let Some(combined_filters) =
689                        table_scan_filters.into_iter().reduce(|acc, filter| {
690                            Expr::BinaryExpr(BinaryExpr {
691                                left: Box::new(acc),
692                                op: Operator::And,
693                                right: Box::new(filter),
694                            })
695                        })
696                    else {
697                        return internal_err!("Failed to combine TableScan filters");
698                    };
699
700                    // Combine `join.filter` with `combined_filters` using `AND`
701                    match &join.filter {
702                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
703                            left: Box::new(filter.clone()),
704                            op: Operator::And,
705                            right: Box::new(combined_filters),
706                        })),
707                        None => Some(combined_filters),
708                    }
709                };
710
711                let join_constraint = self.join_constraint_to_sql(
712                    join.join_constraint,
713                    &join.on,
714                    join_filters.as_ref(),
715                )?;
716
717                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
718                {
719                    Some(select.pop_projections())
720                } else {
721                    None
722                };
723
724                match join.join_type {
725                    JoinType::LeftSemi
726                    | JoinType::LeftAnti
727                    | JoinType::LeftMark
728                    | JoinType::RightSemi
729                    | JoinType::RightAnti
730                    | JoinType::RightMark => {
731                        let mut query_builder = QueryBuilder::default();
732                        let mut from = TableWithJoinsBuilder::default();
733                        let mut exists_select: SelectBuilder = SelectBuilder::default();
734                        from.relation(right_relation);
735                        exists_select.push_from(from);
736                        if let Some(filter) = &join.filter {
737                            exists_select.selection(Some(self.expr_to_sql(filter)?));
738                        }
739                        for (left, right) in &join.on {
740                            exists_select.selection(Some(
741                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
742                            ));
743                        }
744                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
745                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
746                        )]);
747                        query_builder.body(Box::new(SetExpr::Select(Box::new(
748                            exists_select.build()?,
749                        ))));
750
751                        let negated = match join.join_type {
752                            JoinType::LeftSemi
753                            | JoinType::RightSemi
754                            | JoinType::LeftMark
755                            | JoinType::RightMark => false,
756                            JoinType::LeftAnti | JoinType::RightAnti => true,
757                            _ => unreachable!(),
758                        };
759                        let exists_expr = ast::Expr::Exists {
760                            subquery: Box::new(query_builder.build()?),
761                            negated,
762                        };
763
764                        match join.join_type {
765                            JoinType::LeftMark | JoinType::RightMark => {
766                                let source_schema =
767                                    if join.join_type == JoinType::LeftMark {
768                                        right_plan.schema()
769                                    } else {
770                                        left_plan.schema()
771                                    };
772                                let (table_ref, _) = source_schema.qualified_field(0);
773                                let column = self.col_to_sql(&Column::new(
774                                    table_ref.cloned(),
775                                    "mark",
776                                ))?;
777                                select.replace_mark(&column, &exists_expr);
778                            }
779                            _ => {
780                                select.selection(Some(exists_expr));
781                            }
782                        }
783                        if let Some(projection) = left_projection {
784                            select.projection(projection);
785                        }
786                    }
787                    JoinType::Inner
788                    | JoinType::Left
789                    | JoinType::Right
790                    | JoinType::Full => {
791                        let Ok(Some(relation)) = right_relation.build() else {
792                            return internal_err!("Failed to build right relation");
793                        };
794                        let ast_join = ast::Join {
795                            relation,
796                            global: false,
797                            join_operator: self
798                                .join_operator_to_sql(join.join_type, join_constraint)?,
799                        };
800                        let mut from = select.pop_from().unwrap();
801                        from.push_join(ast_join);
802                        select.push_from(from);
803                        if !already_projected {
804                            let Some(left_projection) = left_projection else {
805                                return internal_err!("Left projection is missing");
806                            };
807
808                            let Some(right_projection) = right_projection else {
809                                return internal_err!("Right projection is missing");
810                            };
811
812                            let projection = left_projection
813                                .into_iter()
814                                .chain(right_projection)
815                                .collect();
816                            select.projection(projection);
817                        }
818                    }
819                };
820
821                Ok(())
822            }
823            LogicalPlan::SubqueryAlias(plan_alias) => {
824                let (plan, mut columns) =
825                    subquery_alias_inner_query_and_columns(plan_alias);
826                let unparsed_table_scan = self.unparse_table_scan_pushdown(
827                    plan,
828                    Some(plan_alias.alias.clone()),
829                    select.already_projected(),
830                )?;
831                // if the child plan is a TableScan with pushdown operations, we don't need to
832                // create an additional subquery for it
833                if !select.already_projected() && unparsed_table_scan.is_none() {
834                    select.projection(vec![ast::SelectItem::Wildcard(
835                        ast::WildcardAdditionalOptions::default(),
836                    )]);
837                }
838                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
839                if !columns.is_empty()
840                    && !self.dialect.supports_column_alias_in_table_alias()
841                {
842                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
843                    let rewritten_plan =
844                        match inject_column_aliases_into_subquery(plan, columns) {
845                            Ok(p) => p,
846                            Err(e) => {
847                                return internal_err!(
848                                    "Failed to transform SubqueryAlias plan: {e}"
849                                );
850                            }
851                        };
852
853                    columns = vec![];
854
855                    self.select_to_sql_recursively(
856                        &rewritten_plan,
857                        query,
858                        select,
859                        relation,
860                    )?;
861                } else {
862                    self.select_to_sql_recursively(&plan, query, select, relation)?;
863                }
864
865                relation.alias(Some(
866                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
867                ));
868
869                Ok(())
870            }
871            LogicalPlan::Union(union) => {
872                // Covers cases where the UNION is a subquery and the projection is at the top level
873                if select.already_projected() {
874                    return self.derive_with_dialect_alias(
875                        "derived_union",
876                        plan,
877                        relation,
878                        false,
879                        vec![],
880                    );
881                }
882
883                let input_exprs: Vec<SetExpr> = union
884                    .inputs
885                    .iter()
886                    .map(|input| self.select_to_sql_expr(input, query))
887                    .collect::<Result<Vec<_>>>()?;
888
889                assert_or_internal_err!(
890                    input_exprs.len() >= 2,
891                    "UNION operator requires at least 2 inputs"
892                );
893
894                let set_quantifier =
895                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
896                        // Setting the SetQuantifier to None will unparse as a `UNION`
897                        // rather than a `UNION ALL`.
898                        ast::SetQuantifier::None
899                    } else {
900                        ast::SetQuantifier::All
901                    };
902
903                // Build the union expression tree bottom-up by reversing the order
904                // note that we are also swapping left and right inputs because of the rev
905                let union_expr = input_exprs
906                    .into_iter()
907                    .rev()
908                    .reduce(|a, b| SetExpr::SetOperation {
909                        op: ast::SetOperator::Union,
910                        set_quantifier,
911                        left: Box::new(b),
912                        right: Box::new(a),
913                    })
914                    .unwrap();
915
916                let Some(query) = query.as_mut() else {
917                    return internal_err!(
918                        "UNION ALL operator only valid in a statement context"
919                    );
920                };
921                query.body(Box::new(union_expr));
922
923                Ok(())
924            }
925            LogicalPlan::Window(window) => {
926                // Window nodes are handled simultaneously with Projection nodes
927                self.select_to_sql_recursively(
928                    window.input.as_ref(),
929                    query,
930                    select,
931                    relation,
932                )
933            }
934            LogicalPlan::EmptyRelation(_) => {
935                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
936                // a TableRelationBuilder will be created for the UNNEST node first.
937                if !relation.has_relation() {
938                    relation.empty();
939                }
940                Ok(())
941            }
942            LogicalPlan::Extension(extension) => {
943                if let Some(query) = query.as_mut() {
944                    self.extension_to_sql(
945                        extension.node.as_ref(),
946                        &mut Some(query),
947                        &mut Some(select),
948                        &mut Some(relation),
949                    )
950                } else {
951                    self.extension_to_sql(
952                        extension.node.as_ref(),
953                        &mut None,
954                        &mut Some(select),
955                        &mut Some(relation),
956                    )
957                }
958            }
959            LogicalPlan::Unnest(unnest) => {
960                assert_or_internal_err!(
961                    unnest.struct_type_columns.is_empty(),
962                    "Struct type columns are not currently supported in UNNEST: {:?}",
963                    unnest.struct_type_columns
964                );
965
966                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
967                // Otherwise, there will be a duplicate SELECT clause.
968                // | Projection: table.col1, UNNEST(table.col2)
969                // |   Unnest: UNNEST(table.col2)
970                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
971                // |       Filter: table.col3 = Int64(3)
972                // |         TableScan: table projection=None
973                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
974                    // continue with projection input
975                    self.select_to_sql_recursively(&p.input, query, select, relation)
976                } else {
977                    internal_err!("Unnest input is not a Projection: {unnest:?}")
978                }
979            }
980            LogicalPlan::Subquery(subquery)
981                if find_unnest_node_until_relation(subquery.subquery.as_ref())
982                    .is_some() =>
983            {
984                if self.dialect.unnest_as_table_factor() {
985                    self.select_to_sql_recursively(
986                        subquery.subquery.as_ref(),
987                        query,
988                        select,
989                        relation,
990                    )
991                } else {
992                    self.derive_with_dialect_alias(
993                        "derived_unnest",
994                        subquery.subquery.as_ref(),
995                        relation,
996                        true,
997                        vec![],
998                    )
999                }
1000            }
1001            _ => {
1002                not_impl_err!("Unsupported operator: {plan:?}")
1003            }
1004        }
1005    }
1006
1007    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
1008    ///
1009    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
1010    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
1011    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
1012    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
1013    /// - If the column is not a placeholder column, return [None].
1014    ///
1015    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1016    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1017        if let Expr::Alias(Alias { expr, .. }) = expr
1018            && let Expr::Column(Column { name, .. }) = expr.as_ref()
1019            && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1020        {
1021            if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1022                return Some(UnnestInputType::OuterReference);
1023            }
1024            return Some(UnnestInputType::Scalar);
1025        }
1026        None
1027    }
1028
1029    fn try_unnest_to_table_factor_sql(
1030        &self,
1031        unnest: &Unnest,
1032    ) -> Result<Option<UnnestRelationBuilder>> {
1033        let mut unnest_relation = UnnestRelationBuilder::default();
1034        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1035            return Ok(None);
1036        };
1037
1038        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1039            // It may be possible that UNNEST is used as a source for the query.
1040            // However, at this point, we don't yet know if it is just a single expression
1041            // from another source or if it's from UNNEST.
1042            //
1043            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1044            // which is normally safe to unnest as a table factor.
1045            // However, in the future, more comprehensive checks can be added here.
1046            return Ok(None);
1047        };
1048
1049        let exprs = projection
1050            .expr
1051            .iter()
1052            .map(|e| self.expr_to_sql(e))
1053            .collect::<Result<Vec<_>>>()?;
1054        unnest_relation.array_exprs(exprs);
1055
1056        Ok(Some(unnest_relation))
1057    }
1058
1059    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1060        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1061    }
1062
1063    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1064    /// If the table scan is without any pushdown operations, return None.
1065    fn unparse_table_scan_pushdown(
1066        &self,
1067        plan: &LogicalPlan,
1068        alias: Option<TableReference>,
1069        already_projected: bool,
1070    ) -> Result<Option<LogicalPlan>> {
1071        match plan {
1072            LogicalPlan::TableScan(table_scan) => {
1073                if !Self::is_scan_with_pushdown(table_scan) {
1074                    return Ok(None);
1075                }
1076                let table_schema = table_scan.source.schema();
1077                let mut filter_alias_rewriter =
1078                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1079                        table_schema: &table_schema,
1080                        alias_name: alias_name.clone(),
1081                    });
1082
1083                let mut builder = LogicalPlanBuilder::scan(
1084                    table_scan.table_name.clone(),
1085                    Arc::clone(&table_scan.source),
1086                    None,
1087                )?;
1088                // We will rebase the column references to the new alias if it exists.
1089                // If the projection or filters are empty, we will append alias to the table scan.
1090                //
1091                // Example:
1092                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1093                if let Some(ref alias) = alias
1094                    && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1095                {
1096                    builder = builder.alias(alias.clone())?;
1097                }
1098
1099                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1100                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1101                // information included in the TableScan node.
1102                if !already_projected && let Some(project_vec) = &table_scan.projection {
1103                    if project_vec.is_empty() {
1104                        builder = builder.project(self.empty_projection_fallback())?;
1105                    } else {
1106                        let project_columns = project_vec
1107                            .iter()
1108                            .cloned()
1109                            .map(|i| {
1110                                let schema = table_scan.source.schema();
1111                                let field = schema.field(i);
1112                                if alias.is_some() {
1113                                    Column::new(alias.clone(), field.name().clone())
1114                                } else {
1115                                    Column::new(
1116                                        Some(table_scan.table_name.clone()),
1117                                        field.name().clone(),
1118                                    )
1119                                }
1120                            })
1121                            .collect::<Vec<_>>();
1122                        builder = builder.project(project_columns)?;
1123                    };
1124                }
1125
1126                let filter_expr: Result<Option<Expr>> = table_scan
1127                    .filters
1128                    .iter()
1129                    .cloned()
1130                    .map(|expr| {
1131                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1132                            expr.rewrite(rewriter).data()
1133                        } else {
1134                            Ok(expr)
1135                        }
1136                    })
1137                    .reduce(|acc, expr_result| {
1138                        acc.and_then(|acc_expr| {
1139                            expr_result.map(|expr| acc_expr.and(expr))
1140                        })
1141                    })
1142                    .transpose();
1143
1144                if let Some(filter) = filter_expr? {
1145                    builder = builder.filter(filter)?;
1146                }
1147
1148                if let Some(fetch) = table_scan.fetch {
1149                    builder = builder.limit(0, Some(fetch))?;
1150                }
1151
1152                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1153                // So we will append the alias to this subquery.
1154                // Example:
1155                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1156                if let Some(alias) = alias
1157                    && table_scan.projection.is_none()
1158                    && table_scan.filters.is_empty()
1159                {
1160                    builder = builder.alias(alias)?;
1161                }
1162
1163                Ok(Some(builder.build()?))
1164            }
1165            LogicalPlan::SubqueryAlias(subquery_alias) => {
1166                let ret = self.unparse_table_scan_pushdown(
1167                    &subquery_alias.input,
1168                    Some(subquery_alias.alias.clone()),
1169                    already_projected,
1170                )?;
1171                if let Some(alias) = alias
1172                    && let Some(plan) = ret
1173                {
1174                    let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1175                    return Ok(Some(plan));
1176                }
1177                Ok(ret)
1178            }
1179            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1180            // The inner table scan could be a scan with pushdown operations.
1181            LogicalPlan::Projection(projection) => {
1182                if let Some(plan) = self.unparse_table_scan_pushdown(
1183                    &projection.input,
1184                    alias.clone(),
1185                    already_projected,
1186                )? {
1187                    let exprs = if alias.is_some() {
1188                        let mut alias_rewriter =
1189                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1190                                table_schema: plan.schema().as_arrow(),
1191                                alias_name: alias_name.clone(),
1192                            });
1193                        projection
1194                            .expr
1195                            .iter()
1196                            .cloned()
1197                            .map(|expr| {
1198                                if let Some(ref mut rewriter) = alias_rewriter {
1199                                    expr.rewrite(rewriter).data()
1200                                } else {
1201                                    Ok(expr)
1202                                }
1203                            })
1204                            .collect::<Result<Vec<_>>>()?
1205                    } else {
1206                        projection.expr.clone()
1207                    };
1208                    Ok(Some(
1209                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1210                    ))
1211                } else {
1212                    Ok(None)
1213                }
1214            }
1215            _ => Ok(None),
1216        }
1217    }
1218
1219    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1220        match expr {
1221            Expr::Alias(Alias { expr, name, .. }) => {
1222                let inner = self.expr_to_sql(expr)?;
1223
1224                // Determine the alias name to use
1225                let col_name = if let Some(rewritten_name) =
1226                    self.dialect.col_alias_overrides(name)?
1227                {
1228                    rewritten_name.to_string()
1229                } else {
1230                    name.to_string()
1231                };
1232
1233                Ok(ast::SelectItem::ExprWithAlias {
1234                    expr: inner,
1235                    alias: self.new_ident_quoted_if_needs(col_name),
1236                })
1237            }
1238            _ => {
1239                let inner = self.expr_to_sql(expr)?;
1240
1241                Ok(ast::SelectItem::UnnamedExpr(inner))
1242            }
1243        }
1244    }
1245
1246    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1247        Ok(OrderByKind::Expressions(
1248            sort_exprs
1249                .iter()
1250                .map(|sort_expr| self.sort_to_sql(sort_expr))
1251                .collect::<Result<Vec<_>>>()?,
1252        ))
1253    }
1254
1255    fn join_operator_to_sql(
1256        &self,
1257        join_type: JoinType,
1258        constraint: ast::JoinConstraint,
1259    ) -> Result<ast::JoinOperator> {
1260        Ok(match join_type {
1261            JoinType::Inner => match &constraint {
1262                ast::JoinConstraint::On(_)
1263                | ast::JoinConstraint::Using(_)
1264                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1265                ast::JoinConstraint::None => {
1266                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1267                    // return a CROSS JOIN instead
1268                    ast::JoinOperator::CrossJoin(constraint)
1269                }
1270            },
1271            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1272            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1273            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1274            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1275            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1276            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1277            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1278            JoinType::LeftMark | JoinType::RightMark => {
1279                unimplemented!("Unparsing of Mark join type")
1280            }
1281        })
1282    }
1283
1284    /// Convert the components of a USING clause to the USING AST. Returns
1285    /// 'None' if the conditions are not compatible with a USING expression,
1286    /// e.g. non-column expressions or non-matching names.
1287    fn join_using_to_sql(
1288        &self,
1289        join_conditions: &[(Expr, Expr)],
1290    ) -> Option<ast::JoinConstraint> {
1291        let mut object_names = Vec::with_capacity(join_conditions.len());
1292        for (left, right) in join_conditions {
1293            match (left, right) {
1294                (
1295                    Expr::Column(Column {
1296                        relation: _,
1297                        name: left_name,
1298                        spans: _,
1299                    }),
1300                    Expr::Column(Column {
1301                        relation: _,
1302                        name: right_name,
1303                        spans: _,
1304                    }),
1305                ) if left_name == right_name => {
1306                    // For example, if the join condition `t1.id = t2.id`
1307                    // this is represented as two columns like `[t1.id, t2.id]`
1308                    // This code forms `id` (without relation name)
1309                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1310                    object_names.push(ast::ObjectName::from(vec![ident]));
1311                }
1312                // USING is only valid with matching column names; arbitrary expressions
1313                // are not allowed
1314                _ => return None,
1315            }
1316        }
1317        Some(ast::JoinConstraint::Using(object_names))
1318    }
1319
1320    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1321    fn join_constraint_to_sql(
1322        &self,
1323        constraint: JoinConstraint,
1324        conditions: &[(Expr, Expr)],
1325        filter: Option<&Expr>,
1326    ) -> Result<ast::JoinConstraint> {
1327        match (constraint, conditions, filter) {
1328            // No constraints
1329            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1330                Ok(ast::JoinConstraint::None)
1331            }
1332
1333            (JoinConstraint::Using, conditions, None) => {
1334                match self.join_using_to_sql(conditions) {
1335                    Some(using) => Ok(using),
1336                    // As above, this should not be reachable from parsed SQL,
1337                    // but a user could create this; we "downgrade" to ON.
1338                    None => self.join_conditions_to_sql_on(conditions, None),
1339                }
1340            }
1341
1342            // Two cases here:
1343            // 1. Straightforward ON case, with possible equi-join conditions
1344            //    and additional filters
1345            // 2. USING with additional filters; we "downgrade" to ON, because
1346            //    you can't use USING with arbitrary filters. (This should not
1347            //    be accessible from parsed SQL, but may have been a
1348            //    custom-built JOIN by a user.)
1349            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1350                self.join_conditions_to_sql_on(conditions, filter)
1351            }
1352        }
1353    }
1354
1355    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1356    // AST node, with the equi-join conditions and the filter merged into a
1357    // single conditional expression
1358    fn join_conditions_to_sql_on(
1359        &self,
1360        join_conditions: &[(Expr, Expr)],
1361        filter: Option<&Expr>,
1362    ) -> Result<ast::JoinConstraint> {
1363        let mut condition = None;
1364        // AND the join conditions together to create the overall condition
1365        for (left, right) in join_conditions {
1366            // Parse left and right
1367            let l = self.expr_to_sql(left)?;
1368            let r = self.expr_to_sql(right)?;
1369            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1370            condition = match condition {
1371                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1372                None => Some(e),
1373            };
1374        }
1375
1376        // Then AND the non-equijoin filter condition as well
1377        condition = match (condition, filter) {
1378            (Some(expr), Some(filter)) => {
1379                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1380            }
1381            (Some(expr), None) => Some(expr),
1382            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1383            (None, None) => None,
1384        };
1385
1386        let constraint = match condition {
1387            Some(filter) => ast::JoinConstraint::On(filter),
1388            None => ast::JoinConstraint::None,
1389        };
1390
1391        Ok(constraint)
1392    }
1393
1394    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1395        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1396    }
1397
1398    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1399        let columns = columns
1400            .into_iter()
1401            .map(|ident| TableAliasColumnDef {
1402                name: ident,
1403                data_type: None,
1404            })
1405            .collect();
1406        ast::TableAlias {
1407            name: self.new_ident_quoted_if_needs(alias),
1408            columns,
1409            explicit: true,
1410        }
1411    }
1412
1413    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1414        not_impl_err!("Unsupported plan: {plan:?}")
1415    }
1416
1417    /// Generates appropriate projection expression for empty projection lists.
1418    /// Returns an empty vec for dialects supporting empty select lists,
1419    /// or a dummy literal `1` for other dialects.
1420    fn empty_projection_fallback(&self) -> Vec<Expr> {
1421        if self.dialect.supports_empty_select_list() {
1422            Vec::new()
1423        } else {
1424            vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
1425        }
1426    }
1427}
1428
1429impl From<BuilderError> for DataFusionError {
1430    fn from(e: BuilderError) -> Self {
1431        DataFusionError::External(Box::new(e))
1432    }
1433}
1434
1435/// The type of the input to the UNNEST table factor.
1436#[derive(Debug)]
1437enum UnnestInputType {
1438    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1439    OuterReference,
1440    /// The input is a scalar value. It will be presented like a scalar array or struct.
1441    Scalar,
1442}