datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    ast::{
20        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22    },
23    rewrite::{
24        inject_column_aliases_into_subquery, normalize_union_schema,
25        rewrite_plan_for_sort_on_non_projected_fields,
26        subquery_alias_inner_query_and_columns, TableAliasRewriter,
27    },
28    utils::{
29        find_agg_node_within_select, find_unnest_node_within_select,
30        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32    },
33    Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37    UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    internal_err, not_impl_err,
43    tree_node::{TransformedResult, TreeNode},
44    Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, SetExpr, TableAliasColumnDef};
53use std::sync::Arc;
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// let sql = plan_to_sql(&plan).unwrap(); // convert to AST
85/// // use the Display impl to convert to SQL text
86/// assert_eq!(sql.to_string(), "SELECT \"table\".id, \"table\".\"value\" FROM \"table\"")
87/// ```
88///
89/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
90/// [`expr_to_sql`]: crate::unparser::expr_to_sql
91pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92    let unparser = Unparser::default();
93    unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98        let plan = normalize_union_schema(plan)?;
99
100        match plan {
101            LogicalPlan::Projection(_)
102            | LogicalPlan::Filter(_)
103            | LogicalPlan::Window(_)
104            | LogicalPlan::Aggregate(_)
105            | LogicalPlan::Sort(_)
106            | LogicalPlan::Join(_)
107            | LogicalPlan::Repartition(_)
108            | LogicalPlan::Union(_)
109            | LogicalPlan::TableScan(_)
110            | LogicalPlan::EmptyRelation(_)
111            | LogicalPlan::Subquery(_)
112            | LogicalPlan::SubqueryAlias(_)
113            | LogicalPlan::Limit(_)
114            | LogicalPlan::Statement(_)
115            | LogicalPlan::Values(_)
116            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118            LogicalPlan::Extension(extension) => {
119                self.extension_to_statement(extension.node.as_ref())
120            }
121            LogicalPlan::Explain(_)
122            | LogicalPlan::Analyze(_)
123            | LogicalPlan::Ddl(_)
124            | LogicalPlan::Copy(_)
125            | LogicalPlan::DescribeTable(_)
126            | LogicalPlan::RecursiveQuery(_)
127            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128        }
129    }
130
131    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
132    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
133    /// the first unparsing result will be returned.
134    fn extension_to_statement(
135        &self,
136        node: &dyn UserDefinedLogicalNode,
137    ) -> Result<ast::Statement> {
138        let mut statement = None;
139        for unparser in &self.extension_unparsers {
140            match unparser.unparse_to_statement(node, self)? {
141                UnparseToStatementResult::Modified(stmt) => {
142                    statement = Some(stmt);
143                    break;
144                }
145                UnparseToStatementResult::Unmodified => {}
146            }
147        }
148        if let Some(statement) = statement {
149            Ok(statement)
150        } else {
151            not_impl_err!("Unsupported extension node: {node:?}")
152        }
153    }
154
155    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
156    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
157    /// the first unparser supporting the node will be used.
158    fn extension_to_sql(
159        &self,
160        node: &dyn UserDefinedLogicalNode,
161        query: &mut Option<&mut QueryBuilder>,
162        select: &mut Option<&mut SelectBuilder>,
163        relation: &mut Option<&mut RelationBuilder>,
164    ) -> Result<()> {
165        for unparser in &self.extension_unparsers {
166            match unparser.unparse(node, self, query, select, relation)? {
167                UnparseWithinStatementResult::Modified => return Ok(()),
168                UnparseWithinStatementResult::Unmodified => {}
169            }
170        }
171        not_impl_err!("Unsupported extension node: {node:?}")
172    }
173
174    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175        let mut query_builder = Some(QueryBuilder::default());
176
177        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179        let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181        Ok(ast::Statement::Query(Box::new(query)))
182    }
183
184    fn select_to_sql_expr(
185        &self,
186        plan: &LogicalPlan,
187        query: &mut Option<QueryBuilder>,
188    ) -> Result<SetExpr> {
189        let mut select_builder = SelectBuilder::default();
190        select_builder.push_from(TableWithJoinsBuilder::default());
191        let mut relation_builder = RelationBuilder::default();
192        self.select_to_sql_recursively(
193            plan,
194            query,
195            &mut select_builder,
196            &mut relation_builder,
197        )?;
198
199        // If we were able to construct a full body (i.e. UNION ALL), return it
200        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201            return Ok(*body);
202        }
203
204        // If no projection is set, add a wildcard projection to the select
205        // which will be translated to `SELECT *` in the SQL statement
206        if !select_builder.already_projected() {
207            select_builder.projection(vec![ast::SelectItem::Wildcard(
208                ast::WildcardAdditionalOptions::default(),
209            )]);
210        }
211
212        let mut twj = select_builder.pop_from().unwrap();
213        twj.relation(relation_builder);
214        select_builder.push_from(twj);
215
216        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217    }
218
219    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
220    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
221    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
222    fn reconstruct_select_statement(
223        &self,
224        plan: &LogicalPlan,
225        p: &Projection,
226        select: &mut SelectBuilder,
227    ) -> Result<()> {
228        let mut exprs = p.expr.clone();
229
230        // If an Unnest node is found within the select, find and unproject the unnest column
231        if let Some(unnest) = find_unnest_node_within_select(plan) {
232            exprs = exprs
233                .into_iter()
234                .map(|e| unproject_unnest_expr(e, unnest))
235                .collect::<Result<Vec<_>>>()?;
236        };
237
238        match (
239            find_agg_node_within_select(plan, true),
240            find_window_nodes_within_select(plan, None, true),
241        ) {
242            (Some(agg), window) => {
243                let window_option = window.as_deref();
244                let items = exprs
245                    .into_iter()
246                    .map(|proj_expr| {
247                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248                        self.select_item_to_sql(&unproj)
249                    })
250                    .collect::<Result<Vec<_>>>()?;
251
252                select.projection(items);
253                select.group_by(ast::GroupByExpr::Expressions(
254                    agg.group_expr
255                        .iter()
256                        .map(|expr| self.expr_to_sql(expr))
257                        .collect::<Result<Vec<_>>>()?,
258                    vec![],
259                ));
260            }
261            (None, Some(window)) => {
262                let items = exprs
263                    .into_iter()
264                    .map(|proj_expr| {
265                        let unproj = unproject_window_exprs(proj_expr, &window)?;
266                        self.select_item_to_sql(&unproj)
267                    })
268                    .collect::<Result<Vec<_>>>()?;
269
270                select.projection(items);
271            }
272            _ => {
273                let items = exprs
274                    .iter()
275                    .map(|e| self.select_item_to_sql(e))
276                    .collect::<Result<Vec<_>>>()?;
277                select.projection(items);
278            }
279        }
280        Ok(())
281    }
282
283    fn derive(
284        &self,
285        plan: &LogicalPlan,
286        relation: &mut RelationBuilder,
287        alias: Option<ast::TableAlias>,
288        lateral: bool,
289    ) -> Result<()> {
290        let mut derived_builder = DerivedRelationBuilder::default();
291        derived_builder.lateral(lateral).alias(alias).subquery({
292            let inner_statement = self.plan_to_sql(plan)?;
293            if let ast::Statement::Query(inner_query) = inner_statement {
294                inner_query
295            } else {
296                return internal_err!(
297                    "Subquery must be a Query, but found {inner_statement:?}"
298                );
299            }
300        });
301        relation.derived(derived_builder);
302
303        Ok(())
304    }
305
306    fn derive_with_dialect_alias(
307        &self,
308        alias: &str,
309        plan: &LogicalPlan,
310        relation: &mut RelationBuilder,
311        lateral: bool,
312    ) -> Result<()> {
313        if self.dialect.requires_derived_table_alias() {
314            self.derive(
315                plan,
316                relation,
317                Some(self.new_table_alias(alias.to_string(), vec![])),
318                lateral,
319            )
320        } else {
321            self.derive(plan, relation, None, lateral)
322        }
323    }
324
325    fn select_to_sql_recursively(
326        &self,
327        plan: &LogicalPlan,
328        query: &mut Option<QueryBuilder>,
329        select: &mut SelectBuilder,
330        relation: &mut RelationBuilder,
331    ) -> Result<()> {
332        match plan {
333            LogicalPlan::TableScan(scan) => {
334                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
335                    plan,
336                    None,
337                    select.already_projected(),
338                )? {
339                    return self.select_to_sql_recursively(
340                        &unparsed_table_scan,
341                        query,
342                        select,
343                        relation,
344                    );
345                }
346                let mut builder = TableRelationBuilder::default();
347                let mut table_parts = vec![];
348                if let Some(catalog_name) = scan.table_name.catalog() {
349                    table_parts
350                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
351                }
352                if let Some(schema_name) = scan.table_name.schema() {
353                    table_parts
354                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
355                }
356                table_parts.push(
357                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
358                );
359                builder.name(ast::ObjectName(table_parts));
360                relation.table(builder);
361
362                Ok(())
363            }
364            LogicalPlan::Projection(p) => {
365                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
366                    return self
367                        .select_to_sql_recursively(&new_plan, query, select, relation);
368                }
369
370                // Projection can be top-level plan for unnest relation
371                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
372                // only one expression, which is the placeholder column generated by the rewriter.
373                let unnest_input_type = if p.expr.len() == 1 {
374                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
375                } else {
376                    None
377                };
378                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
379                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
380                        return self
381                            .unnest_to_table_factor_sql(unnest, query, select, relation);
382                    }
383                }
384
385                // Projection can be top-level plan for derived table
386                if select.already_projected() {
387                    return self.derive_with_dialect_alias(
388                        "derived_projection",
389                        plan,
390                        relation,
391                        unnest_input_type
392                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
393                            .is_some(),
394                    );
395                }
396                self.reconstruct_select_statement(plan, p, select)?;
397                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
398            }
399            LogicalPlan::Filter(filter) => {
400                if let Some(agg) =
401                    find_agg_node_within_select(plan, select.already_projected())
402                {
403                    let unprojected =
404                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
405                    let filter_expr = self.expr_to_sql(&unprojected)?;
406                    select.having(Some(filter_expr));
407                } else {
408                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
409                    select.selection(Some(filter_expr));
410                }
411
412                self.select_to_sql_recursively(
413                    filter.input.as_ref(),
414                    query,
415                    select,
416                    relation,
417                )
418            }
419            LogicalPlan::Limit(limit) => {
420                // Limit can be top-level plan for derived table
421                if select.already_projected() {
422                    return self.derive_with_dialect_alias(
423                        "derived_limit",
424                        plan,
425                        relation,
426                        false,
427                    );
428                }
429                if let Some(fetch) = &limit.fetch {
430                    let Some(query) = query.as_mut() else {
431                        return internal_err!(
432                            "Limit operator only valid in a statement context."
433                        );
434                    };
435                    query.limit(Some(self.expr_to_sql(fetch)?));
436                }
437
438                if let Some(skip) = &limit.skip {
439                    let Some(query) = query.as_mut() else {
440                        return internal_err!(
441                            "Offset operator only valid in a statement context."
442                        );
443                    };
444                    query.offset(Some(ast::Offset {
445                        rows: ast::OffsetRows::None,
446                        value: self.expr_to_sql(skip)?,
447                    }));
448                }
449
450                self.select_to_sql_recursively(
451                    limit.input.as_ref(),
452                    query,
453                    select,
454                    relation,
455                )
456            }
457            LogicalPlan::Sort(sort) => {
458                // Sort can be top-level plan for derived table
459                if select.already_projected() {
460                    return self.derive_with_dialect_alias(
461                        "derived_sort",
462                        plan,
463                        relation,
464                        false,
465                    );
466                }
467                let Some(query_ref) = query else {
468                    return internal_err!(
469                        "Sort operator only valid in a statement context."
470                    );
471                };
472
473                if let Some(fetch) = sort.fetch {
474                    query_ref.limit(Some(ast::Expr::Value(ast::Value::Number(
475                        fetch.to_string(),
476                        false,
477                    ))));
478                };
479
480                let agg = find_agg_node_within_select(plan, select.already_projected());
481                // unproject sort expressions
482                let sort_exprs: Vec<SortExpr> = sort
483                    .expr
484                    .iter()
485                    .map(|sort_expr| {
486                        unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
487                    })
488                    .collect::<Result<Vec<_>>>()?;
489
490                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
491
492                self.select_to_sql_recursively(
493                    sort.input.as_ref(),
494                    query,
495                    select,
496                    relation,
497                )
498            }
499            LogicalPlan::Aggregate(agg) => {
500                // Aggregation can be already handled in the projection case
501                if !select.already_projected() {
502                    // The query returns aggregate and group expressions. If that weren't the case,
503                    // the aggregate would have been placed inside a projection, making the check above^ false
504                    let exprs: Vec<_> = agg
505                        .aggr_expr
506                        .iter()
507                        .chain(agg.group_expr.iter())
508                        .map(|expr| self.select_item_to_sql(expr))
509                        .collect::<Result<Vec<_>>>()?;
510                    select.projection(exprs);
511
512                    select.group_by(ast::GroupByExpr::Expressions(
513                        agg.group_expr
514                            .iter()
515                            .map(|expr| self.expr_to_sql(expr))
516                            .collect::<Result<Vec<_>>>()?,
517                        vec![],
518                    ));
519                }
520
521                self.select_to_sql_recursively(
522                    agg.input.as_ref(),
523                    query,
524                    select,
525                    relation,
526                )
527            }
528            LogicalPlan::Distinct(distinct) => {
529                // Distinct can be top-level plan for derived table
530                if select.already_projected() {
531                    return self.derive_with_dialect_alias(
532                        "derived_distinct",
533                        plan,
534                        relation,
535                        false,
536                    );
537                }
538                let (select_distinct, input) = match distinct {
539                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
540                    Distinct::On(on) => {
541                        let exprs = on
542                            .on_expr
543                            .iter()
544                            .map(|e| self.expr_to_sql(e))
545                            .collect::<Result<Vec<_>>>()?;
546                        let items = on
547                            .select_expr
548                            .iter()
549                            .map(|e| self.select_item_to_sql(e))
550                            .collect::<Result<Vec<_>>>()?;
551                        if let Some(sort_expr) = &on.sort_expr {
552                            if let Some(query_ref) = query {
553                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
554                            } else {
555                                return internal_err!(
556                                    "Sort operator only valid in a statement context."
557                                );
558                            }
559                        }
560                        select.projection(items);
561                        (ast::Distinct::On(exprs), on.input.as_ref())
562                    }
563                };
564                select.distinct(Some(select_distinct));
565                self.select_to_sql_recursively(input, query, select, relation)
566            }
567            LogicalPlan::Join(join) => {
568                let mut table_scan_filters = vec![];
569
570                let left_plan =
571                    match try_transform_to_simple_table_scan_with_filters(&join.left)? {
572                        Some((plan, filters)) => {
573                            table_scan_filters.extend(filters);
574                            Arc::new(plan)
575                        }
576                        None => Arc::clone(&join.left),
577                    };
578
579                self.select_to_sql_recursively(
580                    left_plan.as_ref(),
581                    query,
582                    select,
583                    relation,
584                )?;
585
586                let right_plan =
587                    match try_transform_to_simple_table_scan_with_filters(&join.right)? {
588                        Some((plan, filters)) => {
589                            table_scan_filters.extend(filters);
590                            Arc::new(plan)
591                        }
592                        None => Arc::clone(&join.right),
593                    };
594
595                let mut right_relation = RelationBuilder::default();
596
597                self.select_to_sql_recursively(
598                    right_plan.as_ref(),
599                    query,
600                    select,
601                    &mut right_relation,
602                )?;
603
604                let join_filters = if table_scan_filters.is_empty() {
605                    join.filter.clone()
606                } else {
607                    // Combine `table_scan_filters` into a single filter using `AND`
608                    let Some(combined_filters) =
609                        table_scan_filters.into_iter().reduce(|acc, filter| {
610                            Expr::BinaryExpr(BinaryExpr {
611                                left: Box::new(acc),
612                                op: Operator::And,
613                                right: Box::new(filter),
614                            })
615                        })
616                    else {
617                        return internal_err!("Failed to combine TableScan filters");
618                    };
619
620                    // Combine `join.filter` with `combined_filters` using `AND`
621                    match &join.filter {
622                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
623                            left: Box::new(filter.clone()),
624                            op: Operator::And,
625                            right: Box::new(combined_filters),
626                        })),
627                        None => Some(combined_filters),
628                    }
629                };
630
631                let join_constraint = self.join_constraint_to_sql(
632                    join.join_constraint,
633                    &join.on,
634                    join_filters.as_ref(),
635                )?;
636
637                self.select_to_sql_recursively(
638                    right_plan.as_ref(),
639                    query,
640                    select,
641                    &mut right_relation,
642                )?;
643
644                let Ok(Some(relation)) = right_relation.build() else {
645                    return internal_err!("Failed to build right relation");
646                };
647
648                let ast_join = ast::Join {
649                    relation,
650                    global: false,
651                    join_operator: self
652                        .join_operator_to_sql(join.join_type, join_constraint)?,
653                };
654                let mut from = select.pop_from().unwrap();
655                from.push_join(ast_join);
656                select.push_from(from);
657
658                Ok(())
659            }
660            LogicalPlan::SubqueryAlias(plan_alias) => {
661                let (plan, mut columns) =
662                    subquery_alias_inner_query_and_columns(plan_alias);
663                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
664                    plan,
665                    Some(plan_alias.alias.clone()),
666                    select.already_projected(),
667                )?;
668                // if the child plan is a TableScan with pushdown operations, we don't need to
669                // create an additional subquery for it
670                if !select.already_projected() && unparsed_table_scan.is_none() {
671                    select.projection(vec![ast::SelectItem::Wildcard(
672                        ast::WildcardAdditionalOptions::default(),
673                    )]);
674                }
675                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
676                if !columns.is_empty()
677                    && !self.dialect.supports_column_alias_in_table_alias()
678                {
679                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
680                    let rewritten_plan =
681                        match inject_column_aliases_into_subquery(plan, columns) {
682                            Ok(p) => p,
683                            Err(e) => {
684                                return internal_err!(
685                                    "Failed to transform SubqueryAlias plan: {e}"
686                                )
687                            }
688                        };
689
690                    columns = vec![];
691
692                    self.select_to_sql_recursively(
693                        &rewritten_plan,
694                        query,
695                        select,
696                        relation,
697                    )?;
698                } else {
699                    self.select_to_sql_recursively(&plan, query, select, relation)?;
700                }
701
702                relation.alias(Some(
703                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
704                ));
705
706                Ok(())
707            }
708            LogicalPlan::Union(union) => {
709                // Covers cases where the UNION is a subquery and the projection is at the top level
710                if select.already_projected() {
711                    return self.derive_with_dialect_alias(
712                        "derived_union",
713                        plan,
714                        relation,
715                        false,
716                    );
717                }
718
719                let input_exprs: Vec<SetExpr> = union
720                    .inputs
721                    .iter()
722                    .map(|input| self.select_to_sql_expr(input, query))
723                    .collect::<Result<Vec<_>>>()?;
724
725                if input_exprs.len() < 2 {
726                    return internal_err!("UNION operator requires at least 2 inputs");
727                }
728
729                // Build the union expression tree bottom-up by reversing the order
730                // note that we are also swapping left and right inputs because of the rev
731                let union_expr = input_exprs
732                    .into_iter()
733                    .rev()
734                    .reduce(|a, b| SetExpr::SetOperation {
735                        op: ast::SetOperator::Union,
736                        set_quantifier: ast::SetQuantifier::All,
737                        left: Box::new(b),
738                        right: Box::new(a),
739                    })
740                    .unwrap();
741
742                let Some(query) = query.as_mut() else {
743                    return internal_err!(
744                        "UNION ALL operator only valid in a statement context"
745                    );
746                };
747                query.body(Box::new(union_expr));
748
749                Ok(())
750            }
751            LogicalPlan::Window(window) => {
752                // Window nodes are handled simultaneously with Projection nodes
753                self.select_to_sql_recursively(
754                    window.input.as_ref(),
755                    query,
756                    select,
757                    relation,
758                )
759            }
760            LogicalPlan::EmptyRelation(_) => {
761                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
762                // a TableRelationBuilder will be created for the UNNEST node first.
763                if !relation.has_relation() {
764                    relation.empty();
765                }
766                Ok(())
767            }
768            LogicalPlan::Extension(extension) => {
769                if let Some(query) = query.as_mut() {
770                    self.extension_to_sql(
771                        extension.node.as_ref(),
772                        &mut Some(query),
773                        &mut Some(select),
774                        &mut Some(relation),
775                    )
776                } else {
777                    self.extension_to_sql(
778                        extension.node.as_ref(),
779                        &mut None,
780                        &mut Some(select),
781                        &mut Some(relation),
782                    )
783                }
784            }
785            LogicalPlan::Unnest(unnest) => {
786                if !unnest.struct_type_columns.is_empty() {
787                    return internal_err!(
788                        "Struct type columns are not currently supported in UNNEST: {:?}",
789                        unnest.struct_type_columns
790                    );
791                }
792
793                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
794                // Otherwise, there will be a duplicate SELECT clause.
795                // | Projection: table.col1, UNNEST(table.col2)
796                // |   Unnest: UNNEST(table.col2)
797                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
798                // |       Filter: table.col3 = Int64(3)
799                // |         TableScan: table projection=None
800                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
801                    // continue with projection input
802                    self.select_to_sql_recursively(&p.input, query, select, relation)
803                } else {
804                    internal_err!("Unnest input is not a Projection: {unnest:?}")
805                }
806            }
807            LogicalPlan::Subquery(subquery)
808                if find_unnest_node_until_relation(subquery.subquery.as_ref())
809                    .is_some() =>
810            {
811                if self.dialect.unnest_as_table_factor() {
812                    self.select_to_sql_recursively(
813                        subquery.subquery.as_ref(),
814                        query,
815                        select,
816                        relation,
817                    )
818                } else {
819                    self.derive_with_dialect_alias(
820                        "derived_unnest",
821                        subquery.subquery.as_ref(),
822                        relation,
823                        true,
824                    )
825                }
826            }
827            _ => {
828                not_impl_err!("Unsupported operator: {plan:?}")
829            }
830        }
831    }
832
833    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
834    ///
835    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
836    ///     it means it is a scalar column, return [UnnestInputType::Scalar].
837    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
838    ///     it means it is an outer reference column, return [UnnestInputType::OuterReference].
839    /// - If the column is not a placeholder column, return [None].
840    ///
841    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
842    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
843        if let Expr::Alias(Alias { expr, .. }) = expr {
844            if let Expr::Column(Column { name, .. }) = expr.as_ref() {
845                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
846                    if prefix.starts_with(&format!("({}(", OUTER_REFERENCE_COLUMN_PREFIX))
847                    {
848                        return Some(UnnestInputType::OuterReference);
849                    }
850                    return Some(UnnestInputType::Scalar);
851                }
852            }
853        }
854        None
855    }
856
857    fn unnest_to_table_factor_sql(
858        &self,
859        unnest: &Unnest,
860        query: &mut Option<QueryBuilder>,
861        select: &mut SelectBuilder,
862        relation: &mut RelationBuilder,
863    ) -> Result<()> {
864        let mut unnest_relation = UnnestRelationBuilder::default();
865        let LogicalPlan::Projection(p) = unnest.input.as_ref() else {
866            return internal_err!("Unnest input is not a Projection: {unnest:?}");
867        };
868        let exprs = p
869            .expr
870            .iter()
871            .map(|e| self.expr_to_sql(e))
872            .collect::<Result<Vec<_>>>()?;
873        unnest_relation.array_exprs(exprs);
874        relation.unnest(unnest_relation);
875        self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
876    }
877
878    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
879        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
880    }
881
882    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
883    /// If the table scan is without any pushdown operations, return None.
884    fn unparse_table_scan_pushdown(
885        plan: &LogicalPlan,
886        alias: Option<TableReference>,
887        already_projected: bool,
888    ) -> Result<Option<LogicalPlan>> {
889        match plan {
890            LogicalPlan::TableScan(table_scan) => {
891                if !Self::is_scan_with_pushdown(table_scan) {
892                    return Ok(None);
893                }
894                let table_schema = table_scan.source.schema();
895                let mut filter_alias_rewriter =
896                    alias.as_ref().map(|alias_name| TableAliasRewriter {
897                        table_schema: &table_schema,
898                        alias_name: alias_name.clone(),
899                    });
900
901                let mut builder = LogicalPlanBuilder::scan(
902                    table_scan.table_name.clone(),
903                    Arc::clone(&table_scan.source),
904                    None,
905                )?;
906                // We will rebase the column references to the new alias if it exists.
907                // If the projection or filters are empty, we will append alias to the table scan.
908                //
909                // Example:
910                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
911                if let Some(ref alias) = alias {
912                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
913                        builder = builder.alias(alias.clone())?;
914                    }
915                }
916
917                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
918                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
919                // information included in the TableScan node.
920                if !already_projected {
921                    if let Some(project_vec) = &table_scan.projection {
922                        if project_vec.is_empty() {
923                            builder = builder.project(vec![Expr::Literal(
924                                ScalarValue::Int64(Some(1)),
925                            )])?;
926                        } else {
927                            let project_columns = project_vec
928                                .iter()
929                                .cloned()
930                                .map(|i| {
931                                    let schema = table_scan.source.schema();
932                                    let field = schema.field(i);
933                                    if alias.is_some() {
934                                        Column::new(alias.clone(), field.name().clone())
935                                    } else {
936                                        Column::new(
937                                            Some(table_scan.table_name.clone()),
938                                            field.name().clone(),
939                                        )
940                                    }
941                                })
942                                .collect::<Vec<_>>();
943                            builder = builder.project(project_columns)?;
944                        };
945                    }
946                }
947
948                let filter_expr: Result<Option<Expr>> = table_scan
949                    .filters
950                    .iter()
951                    .cloned()
952                    .map(|expr| {
953                        if let Some(ref mut rewriter) = filter_alias_rewriter {
954                            expr.rewrite(rewriter).data()
955                        } else {
956                            Ok(expr)
957                        }
958                    })
959                    .reduce(|acc, expr_result| {
960                        acc.and_then(|acc_expr| {
961                            expr_result.map(|expr| acc_expr.and(expr))
962                        })
963                    })
964                    .transpose();
965
966                if let Some(filter) = filter_expr? {
967                    builder = builder.filter(filter)?;
968                }
969
970                if let Some(fetch) = table_scan.fetch {
971                    builder = builder.limit(0, Some(fetch))?;
972                }
973
974                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
975                // So we will append the alias to this subquery.
976                // Example:
977                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
978                if let Some(alias) = alias {
979                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
980                        builder = builder.alias(alias)?;
981                    }
982                }
983
984                Ok(Some(builder.build()?))
985            }
986            LogicalPlan::SubqueryAlias(subquery_alias) => {
987                Self::unparse_table_scan_pushdown(
988                    &subquery_alias.input,
989                    Some(subquery_alias.alias.clone()),
990                    already_projected,
991                )
992            }
993            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
994            // The inner table scan could be a scan with pushdown operations.
995            LogicalPlan::Projection(projection) => {
996                if let Some(plan) = Self::unparse_table_scan_pushdown(
997                    &projection.input,
998                    alias.clone(),
999                    already_projected,
1000                )? {
1001                    let exprs = if alias.is_some() {
1002                        let mut alias_rewriter =
1003                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1004                                table_schema: plan.schema().as_arrow(),
1005                                alias_name: alias_name.clone(),
1006                            });
1007                        projection
1008                            .expr
1009                            .iter()
1010                            .cloned()
1011                            .map(|expr| {
1012                                if let Some(ref mut rewriter) = alias_rewriter {
1013                                    expr.rewrite(rewriter).data()
1014                                } else {
1015                                    Ok(expr)
1016                                }
1017                            })
1018                            .collect::<Result<Vec<_>>>()?
1019                    } else {
1020                        projection.expr.clone()
1021                    };
1022                    Ok(Some(
1023                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1024                    ))
1025                } else {
1026                    Ok(None)
1027                }
1028            }
1029            _ => Ok(None),
1030        }
1031    }
1032
1033    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1034        match expr {
1035            Expr::Alias(Alias { expr, name, .. }) => {
1036                let inner = self.expr_to_sql(expr)?;
1037
1038                Ok(ast::SelectItem::ExprWithAlias {
1039                    expr: inner,
1040                    alias: self.new_ident_quoted_if_needs(name.to_string()),
1041                })
1042            }
1043            _ => {
1044                let inner = self.expr_to_sql(expr)?;
1045
1046                Ok(ast::SelectItem::UnnamedExpr(inner))
1047            }
1048        }
1049    }
1050
1051    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<Vec<ast::OrderByExpr>> {
1052        sort_exprs
1053            .iter()
1054            .map(|sort_expr| self.sort_to_sql(sort_expr))
1055            .collect::<Result<Vec<_>>>()
1056    }
1057
1058    fn join_operator_to_sql(
1059        &self,
1060        join_type: JoinType,
1061        constraint: ast::JoinConstraint,
1062    ) -> Result<ast::JoinOperator> {
1063        Ok(match join_type {
1064            JoinType::Inner => match &constraint {
1065                ast::JoinConstraint::On(_)
1066                | ast::JoinConstraint::Using(_)
1067                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1068                ast::JoinConstraint::None => {
1069                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1070                    // return a CROSS JOIN instead
1071                    ast::JoinOperator::CrossJoin
1072                }
1073            },
1074            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1075            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1076            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1077            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1078            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1079            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1080            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1081            JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"),
1082        })
1083    }
1084
1085    /// Convert the components of a USING clause to the USING AST. Returns
1086    /// 'None' if the conditions are not compatible with a USING expression,
1087    /// e.g. non-column expressions or non-matching names.
1088    fn join_using_to_sql(
1089        &self,
1090        join_conditions: &[(Expr, Expr)],
1091    ) -> Option<ast::JoinConstraint> {
1092        let mut object_names = Vec::with_capacity(join_conditions.len());
1093        for (left, right) in join_conditions {
1094            match (left, right) {
1095                (
1096                    Expr::Column(Column {
1097                        relation: _,
1098                        name: left_name,
1099                        spans: _,
1100                    }),
1101                    Expr::Column(Column {
1102                        relation: _,
1103                        name: right_name,
1104                        spans: _,
1105                    }),
1106                ) if left_name == right_name => {
1107                    // For example, if the join condition `t1.id = t2.id`
1108                    // this is represented as two columns like `[t1.id, t2.id]`
1109                    // This code forms `id` (without relation name)
1110                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1111                    object_names.push(ast::ObjectName(vec![ident]));
1112                }
1113                // USING is only valid with matching column names; arbitrary expressions
1114                // are not allowed
1115                _ => return None,
1116            }
1117        }
1118        Some(ast::JoinConstraint::Using(object_names))
1119    }
1120
1121    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1122    fn join_constraint_to_sql(
1123        &self,
1124        constraint: JoinConstraint,
1125        conditions: &[(Expr, Expr)],
1126        filter: Option<&Expr>,
1127    ) -> Result<ast::JoinConstraint> {
1128        match (constraint, conditions, filter) {
1129            // No constraints
1130            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1131                Ok(ast::JoinConstraint::None)
1132            }
1133
1134            (JoinConstraint::Using, conditions, None) => {
1135                match self.join_using_to_sql(conditions) {
1136                    Some(using) => Ok(using),
1137                    // As above, this should not be reachable from parsed SQL,
1138                    // but a user could create this; we "downgrade" to ON.
1139                    None => self.join_conditions_to_sql_on(conditions, None),
1140                }
1141            }
1142
1143            // Two cases here:
1144            // 1. Straightforward ON case, with possible equi-join conditions
1145            //    and additional filters
1146            // 2. USING with additional filters; we "downgrade" to ON, because
1147            //    you can't use USING with arbitrary filters. (This should not
1148            //    be accessible from parsed SQL, but may have been a
1149            //    custom-built JOIN by a user.)
1150            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1151                self.join_conditions_to_sql_on(conditions, filter)
1152            }
1153        }
1154    }
1155
1156    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1157    // AST node, with the equi-join conditions and the filter merged into a
1158    // single conditional expression
1159    fn join_conditions_to_sql_on(
1160        &self,
1161        join_conditions: &[(Expr, Expr)],
1162        filter: Option<&Expr>,
1163    ) -> Result<ast::JoinConstraint> {
1164        let mut condition = None;
1165        // AND the join conditions together to create the overall condition
1166        for (left, right) in join_conditions {
1167            // Parse left and right
1168            let l = self.expr_to_sql(left)?;
1169            let r = self.expr_to_sql(right)?;
1170            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1171            condition = match condition {
1172                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1173                None => Some(e),
1174            };
1175        }
1176
1177        // Then AND the non-equijoin filter condition as well
1178        condition = match (condition, filter) {
1179            (Some(expr), Some(filter)) => {
1180                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1181            }
1182            (Some(expr), None) => Some(expr),
1183            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1184            (None, None) => None,
1185        };
1186
1187        let constraint = match condition {
1188            Some(filter) => ast::JoinConstraint::On(filter),
1189            None => ast::JoinConstraint::None,
1190        };
1191
1192        Ok(constraint)
1193    }
1194
1195    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1196        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1197    }
1198
1199    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1200        let columns = columns
1201            .into_iter()
1202            .map(|ident| TableAliasColumnDef {
1203                name: ident,
1204                data_type: None,
1205            })
1206            .collect();
1207        ast::TableAlias {
1208            name: self.new_ident_quoted_if_needs(alias),
1209            columns,
1210        }
1211    }
1212
1213    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1214        not_impl_err!("Unsupported plan: {plan:?}")
1215    }
1216}
1217
1218impl From<BuilderError> for DataFusionError {
1219    fn from(e: BuilderError) -> Self {
1220        DataFusionError::External(Box::new(e))
1221    }
1222}
1223
1224/// The type of the input to the UNNEST table factor.
1225#[derive(Debug)]
1226enum UnnestInputType {
1227    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1228    OuterReference,
1229    /// The input is a scalar value. It will be presented like a scalar array or struct.
1230    Scalar,
1231}