Skip to main content

datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    Unparser,
20    ast::{
21        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23    },
24    rewrite::{
25        TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26        rewrite_plan_for_sort_on_non_projected_fields,
27        subquery_alias_inner_query_and_columns,
28    },
29    utils::{
30        find_agg_node_within_select, find_unnest_node_within_select,
31        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32        unproject_sort_expr, unproject_unnest_expr,
33        unproject_unnest_expr_as_flatten_value, unproject_window_exprs,
34    },
35};
36use crate::unparser::extension_unparser::{
37    UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::unparser::{
41    ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, rewrite::rewrite_qualify,
42};
43use crate::utils::UNNEST_PLACEHOLDER;
44use datafusion_common::{
45    Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
46    internal_datafusion_err, internal_err, not_impl_err,
47    tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion},
48};
49use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX};
50use datafusion_expr::{
51    Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
52    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
53    UserDefinedLogicalNode, Window, expr::Alias,
54};
55use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
56use std::{sync::Arc, vec};
57
58/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
59///
60/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
61/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
62///
63/// # Errors
64///
65/// This function returns an error if the plan cannot be converted to SQL.
66///
67/// # See Also
68///
69/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
70///
71/// # Example
72/// ```
73/// use arrow::datatypes::{DataType, Field, Schema};
74/// use datafusion_expr::{col, logical_plan::table_scan};
75/// use datafusion_sql::unparser::plan_to_sql;
76/// let schema = Schema::new(vec![
77///     Field::new("id", DataType::Utf8, false),
78///     Field::new("value", DataType::Utf8, false),
79/// ]);
80/// // Scan 'table' and select columns 'id' and 'value'
81/// let plan = table_scan(Some("table"), &schema, None)
82///     .unwrap()
83///     .project(vec![col("id"), col("value")])
84///     .unwrap()
85///     .build()
86///     .unwrap();
87/// // convert to AST
88/// let sql = plan_to_sql(&plan).unwrap();
89/// // use the Display impl to convert to SQL text
90/// assert_eq!(
91///     sql.to_string(),
92///     "SELECT \"table\".id, \"table\".\"value\" FROM \"table\""
93/// )
94/// ```
95///
96/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
97/// [`expr_to_sql`]: crate::unparser::expr_to_sql
98pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
99    let unparser = Unparser::default();
100    unparser.plan_to_sql(plan)
101}
102
103impl Unparser<'_> {
104    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
105        let mut plan = normalize_union_schema(plan)?;
106        if !self.dialect.supports_qualify() {
107            plan = rewrite_qualify(plan)?;
108        }
109
110        match plan {
111            LogicalPlan::Projection(_)
112            | LogicalPlan::Filter(_)
113            | LogicalPlan::Window(_)
114            | LogicalPlan::Aggregate(_)
115            | LogicalPlan::Sort(_)
116            | LogicalPlan::Join(_)
117            | LogicalPlan::Repartition(_)
118            | LogicalPlan::Union(_)
119            | LogicalPlan::TableScan(_)
120            | LogicalPlan::EmptyRelation(_)
121            | LogicalPlan::Subquery(_)
122            | LogicalPlan::SubqueryAlias(_)
123            | LogicalPlan::Limit(_)
124            | LogicalPlan::Statement(_)
125            | LogicalPlan::Values(_)
126            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
127            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
128            LogicalPlan::Extension(extension) => {
129                self.extension_to_statement(extension.node.as_ref())
130            }
131            LogicalPlan::Explain(_)
132            | LogicalPlan::Analyze(_)
133            | LogicalPlan::Ddl(_)
134            | LogicalPlan::Copy(_)
135            | LogicalPlan::DescribeTable(_)
136            | LogicalPlan::RecursiveQuery(_)
137            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
138        }
139    }
140
141    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
142    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
143    /// the first unparsing result will be returned.
144    fn extension_to_statement(
145        &self,
146        node: &dyn UserDefinedLogicalNode,
147    ) -> Result<ast::Statement> {
148        let mut statement = None;
149        for unparser in &self.extension_unparsers {
150            match unparser.unparse_to_statement(node, self)? {
151                UnparseToStatementResult::Modified(stmt) => {
152                    statement = Some(stmt);
153                    break;
154                }
155                UnparseToStatementResult::Unmodified => {}
156            }
157        }
158        if let Some(statement) = statement {
159            Ok(statement)
160        } else {
161            not_impl_err!("Unsupported extension node: {node:?}")
162        }
163    }
164
165    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
166    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
167    /// the first unparser supporting the node will be used.
168    fn extension_to_sql(
169        &self,
170        node: &dyn UserDefinedLogicalNode,
171        query: &mut Option<&mut QueryBuilder>,
172        select: &mut Option<&mut SelectBuilder>,
173        relation: &mut Option<&mut RelationBuilder>,
174    ) -> Result<()> {
175        for unparser in &self.extension_unparsers {
176            match unparser.unparse(node, self, query, select, relation)? {
177                UnparseWithinStatementResult::Modified => return Ok(()),
178                UnparseWithinStatementResult::Unmodified => {}
179            }
180        }
181        not_impl_err!("Unsupported extension node: {node:?}")
182    }
183
184    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
185        let mut query_builder = Some(QueryBuilder::default());
186
187        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
188
189        let query = query_builder.unwrap().body(Box::new(body)).build()?;
190
191        Ok(ast::Statement::Query(Box::new(query)))
192    }
193
194    fn select_to_sql_expr(
195        &self,
196        plan: &LogicalPlan,
197        query: &mut Option<QueryBuilder>,
198    ) -> Result<SetExpr> {
199        let mut select_builder = SelectBuilder::default();
200        select_builder.push_from(TableWithJoinsBuilder::default());
201        let mut relation_builder = RelationBuilder::default();
202        self.select_to_sql_recursively(
203            plan,
204            query,
205            &mut select_builder,
206            &mut relation_builder,
207        )?;
208
209        // If we were able to construct a full body (i.e. UNION ALL), return it
210        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
211            return Ok(*body);
212        }
213
214        // If no projection is set, add a wildcard projection to the select
215        // which will be translated to `SELECT *` in the SQL statement
216        if !select_builder.already_projected() {
217            select_builder.projection(vec![ast::SelectItem::Wildcard(
218                ast::WildcardAdditionalOptions::default(),
219            )]);
220        }
221
222        let mut twj = select_builder.pop_from().unwrap();
223        twj.relation(relation_builder);
224        select_builder.push_from(twj);
225
226        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
227    }
228
229    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
230    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
231    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
232    fn reconstruct_select_statement(
233        &self,
234        plan: &LogicalPlan,
235        p: &Projection,
236        select: &mut SelectBuilder,
237    ) -> Result<()> {
238        let mut exprs = p.expr.clone();
239
240        // If an Unnest node is found within the select, find and unproject the unnest column
241        let flatten_alias = select.current_flatten_alias();
242        if let Some(unnest) = find_unnest_node_within_select(plan) {
243            if let Some(ref alias) = flatten_alias {
244                exprs = exprs
245                    .into_iter()
246                    .map(|e| unproject_unnest_expr_as_flatten_value(e, unnest, alias))
247                    .collect::<Result<Vec<_>>>()?;
248            } else {
249                exprs = exprs
250                    .into_iter()
251                    .map(|e| unproject_unnest_expr(e, unnest))
252                    .collect::<Result<Vec<_>>>()?;
253            }
254        };
255
256        // Rewrite column references that point to FLATTEN table aliases:
257        // in Snowflake, FLATTEN output is accessed via .VALUE, not the
258        // original column name.
259        if !select.flatten_table_aliases_empty() {
260            exprs = exprs
261                .into_iter()
262                .map(|e| {
263                    e.transform(|expr| {
264                        if let Expr::Column(ref col) = expr
265                            && let Some(ref relation) = col.relation
266                            && select.is_flatten_table_alias(relation.table())
267                        {
268                            return Ok(Transformed::yes(Expr::Column(Column::new(
269                                Some(relation.clone()),
270                                "VALUE",
271                            ))));
272                        }
273                        Ok(Transformed::no(expr))
274                    })
275                    .map(|t| t.data)
276                })
277                .collect::<Result<Vec<_>>>()?;
278        }
279
280        match (
281            find_agg_node_within_select(plan, true),
282            find_window_nodes_within_select(plan, None, true),
283        ) {
284            (Some(agg), window) => {
285                let window_option = window.as_deref();
286                let items = exprs
287                    .into_iter()
288                    .map(|proj_expr| {
289                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
290                        self.select_item_to_sql(&unproj)
291                    })
292                    .collect::<Result<Vec<_>>>()?;
293
294                select.projection(items);
295                select.group_by(ast::GroupByExpr::Expressions(
296                    agg.group_expr
297                        .iter()
298                        .map(|expr| self.expr_to_sql(expr))
299                        .collect::<Result<Vec<_>>>()?,
300                    vec![],
301                ));
302            }
303            (None, Some(window)) => {
304                let items = exprs
305                    .into_iter()
306                    .map(|proj_expr| {
307                        let unproj = unproject_window_exprs(proj_expr, &window)?;
308                        self.select_item_to_sql(&unproj)
309                    })
310                    .collect::<Result<Vec<_>>>()?;
311
312                select.projection(items);
313            }
314            _ => {
315                let items = exprs
316                    .iter()
317                    .map(|e| {
318                        // After unproject_unnest_expr_as_flatten_value, an
319                        // internal UNNEST display-name alias may still wrap
320                        // the rewritten _unnest.VALUE column. Replace it
321                        // with the bare FLATTEN VALUE select item.
322                        if let Some(ref alias) = flatten_alias
323                            && Self::has_internal_unnest_alias(e)
324                        {
325                            return Ok(self.build_flatten_value_select_item(alias, None));
326                        }
327                        self.select_item_to_sql(e)
328                    })
329                    .collect::<Result<Vec<_>>>()?;
330                select.projection(items);
331            }
332        }
333        Ok(())
334    }
335
336    fn derive(
337        &self,
338        plan: &LogicalPlan,
339        relation: &mut RelationBuilder,
340        alias: Option<ast::TableAlias>,
341        lateral: bool,
342    ) -> Result<()> {
343        let mut derived_builder = DerivedRelationBuilder::default();
344        derived_builder.lateral(lateral).alias(alias).subquery({
345            let inner_statement = self.plan_to_sql(plan)?;
346            if let ast::Statement::Query(inner_query) = inner_statement {
347                inner_query
348            } else {
349                return internal_err!(
350                    "Subquery must be a Query, but found {inner_statement:?}"
351                );
352            }
353        });
354        relation.derived(derived_builder);
355
356        Ok(())
357    }
358
359    fn derive_with_dialect_alias(
360        &self,
361        alias: &str,
362        plan: &LogicalPlan,
363        relation: &mut RelationBuilder,
364        lateral: bool,
365        columns: Vec<Ident>,
366    ) -> Result<()> {
367        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
368            self.derive(
369                plan,
370                relation,
371                Some(self.new_table_alias(alias.to_string(), columns)),
372                lateral,
373            )
374        } else {
375            self.derive(plan, relation, None, lateral)
376        }
377    }
378
379    /// Projection unparsing when [`super::dialect::Dialect::unnest_as_lateral_flatten`] is enabled:
380    /// Snowflake-style `LATERAL FLATTEN` for unnest (not other dialect spellings).
381    ///
382    /// [`Self::peel_to_unnest_with_modifiers`] walks through any intermediate
383    /// Limit/Sort nodes (the optimizer can insert these between the Projection
384    /// and the Unnest), applies their modifiers to the query, and returns the
385    /// Unnest plus the [`LogicalPlan`] ref to recurse into. This bypasses the
386    /// normal Limit/Sort handlers which would wrap the subtree in a derived
387    /// subquery.
388    ///
389    /// SELECT rendering is delegated to [`Self::reconstruct_select_statement`],
390    /// which rewrites placeholder columns to `alias."VALUE"` via
391    /// [`unproject_unnest_expr_as_flatten_value`].
392    ///
393    /// Returns `Ok(true)` when this path fully handled the projection.
394    fn try_projection_unnest_as_lateral_flatten(
395        &self,
396        plan: &LogicalPlan,
397        p: &Projection,
398        query: &mut Option<QueryBuilder>,
399        select: &mut SelectBuilder,
400        relation: &mut RelationBuilder,
401        unnest_input_type: Option<&UnnestInputType>,
402    ) -> Result<bool> {
403        // unnest_as_lateral_flatten: Snowflake LATERAL FLATTEN
404        //
405        // Generate the alias up front so that peel_to_unnest_with_modifiers
406        // can rewrite ORDER BY placeholder columns to alias.VALUE.
407        if self.dialect.unnest_as_lateral_flatten() && unnest_input_type.is_some() {
408            let flatten_alias_name = if !select.already_projected() {
409                select.next_flatten_alias()
410            } else {
411                select
412                    .current_flatten_alias()
413                    .unwrap_or_else(|| select.next_flatten_alias())
414            };
415
416            if let Some((unnest, unnest_plan)) = self.peel_to_unnest_with_modifiers(
417                p.input.as_ref(),
418                query,
419                Some(&flatten_alias_name),
420            )? && let Some(mut flatten) =
421                self.try_unnest_to_lateral_flatten_sql(unnest)?
422            {
423                let inner_projection = Self::peel_to_inner_projection(
424                    unnest.input.as_ref(),
425                )
426                .ok_or_else(|| {
427                    internal_datafusion_err!(
428                        "Unnest input is not a Projection: {:?}",
429                        unnest.input
430                    )
431                })?;
432
433                flatten.alias(Some(ast::TableAlias {
434                    name: Ident::with_quote('"', &flatten_alias_name),
435                    columns: vec![],
436                    explicit: true,
437                    at: None,
438                }));
439
440                if !select.already_projected() {
441                    self.reconstruct_select_statement(plan, p, select)?;
442                }
443
444                if matches!(
445                    inner_projection.input.as_ref(),
446                    LogicalPlan::EmptyRelation(_)
447                ) {
448                    relation.flatten(flatten);
449                    self.select_to_sql_recursively(unnest_plan, query, select, relation)?;
450                    return Ok(true);
451                }
452
453                self.select_to_sql_recursively(unnest_plan, query, select, relation)?;
454
455                let flatten_factor = flatten.build().map_err(|e| {
456                    internal_datafusion_err!("Failed to build FLATTEN: {e}")
457                })?;
458                let cross_join = ast::Join {
459                    relation: flatten_factor,
460                    global: false,
461                    join_operator: ast::JoinOperator::CrossJoin(
462                        ast::JoinConstraint::None,
463                    ),
464                };
465                if let Some(mut from) = select.pop_from() {
466                    from.push_join(cross_join);
467                    select.push_from(from);
468                } else {
469                    let mut twj = TableWithJoinsBuilder::default();
470                    twj.push_join(cross_join);
471                    select.push_from(twj);
472                }
473
474                return Ok(true);
475            }
476        }
477
478        Ok(false)
479    }
480
481    fn project_window_output(
482        &self,
483        window_expr: &[Expr],
484        select: &mut SelectBuilder,
485        agg: Option<&Aggregate>,
486    ) -> Result<()> {
487        let mut items = if select.already_projected() {
488            select.pop_projections()
489        } else {
490            vec![ast::SelectItem::Wildcard(
491                ast::WildcardAdditionalOptions::default(),
492            )]
493        };
494
495        items.extend(
496            window_expr
497                .iter()
498                .map(|expr| {
499                    let expr = if let Some(agg) = agg {
500                        unproject_agg_exprs(expr.clone(), agg, None)?
501                    } else {
502                        expr.clone()
503                    };
504                    self.select_item_to_sql(&expr)
505                })
506                .collect::<Result<Vec<_>>>()?,
507        );
508        select.projection(items);
509
510        Ok(())
511    }
512
513    fn window_input_requires_derived_subquery(plan: &LogicalPlan) -> bool {
514        // These operators either produce a SELECT list or apply SQL clauses
515        // that are evaluated after window functions in a single SELECT block.
516        // Keep them below the Window node by emitting a derived table.
517        matches!(
518            plan,
519            LogicalPlan::Projection(_)
520                | LogicalPlan::Distinct(_)
521                | LogicalPlan::Limit(_)
522                | LogicalPlan::Sort(_)
523                | LogicalPlan::Union(_)
524        )
525    }
526
527    fn window_to_sql_with_derived_input(
528        &self,
529        window: &Window,
530        select: &mut SelectBuilder,
531        relation: &mut RelationBuilder,
532    ) -> Result<()> {
533        let input_alias = "derived_window_input";
534        self.derive(
535            window.input.as_ref(),
536            relation,
537            Some(self.new_table_alias(input_alias.to_string(), vec![])),
538            false,
539        )?;
540
541        let input_schema = window.input.schema();
542        let mut alias_rewriter = TableAliasRewriter {
543            table_schema: input_schema.as_arrow(),
544            alias_name: TableReference::bare(input_alias),
545        };
546        let window_expr = window
547            .window_expr
548            .iter()
549            .map(|expr| expr.clone().rewrite(&mut alias_rewriter).data())
550            .collect::<Result<Vec<_>>>()?;
551
552        self.project_window_output(&window_expr, select, None)
553    }
554
555    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
556    fn select_to_sql_recursively(
557        &self,
558        plan: &LogicalPlan,
559        query: &mut Option<QueryBuilder>,
560        select: &mut SelectBuilder,
561        relation: &mut RelationBuilder,
562    ) -> Result<()> {
563        match plan {
564            LogicalPlan::TableScan(scan) => {
565                if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
566                    plan,
567                    None,
568                    select.already_projected(),
569                )? {
570                    return self.select_to_sql_recursively(
571                        &unparsed_table_scan,
572                        query,
573                        select,
574                        relation,
575                    );
576                }
577                let mut builder = TableRelationBuilder::default();
578                let mut table_parts = vec![];
579                if let Some(catalog_name) = scan.table_name.catalog() {
580                    table_parts
581                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
582                }
583                if let Some(schema_name) = scan.table_name.schema() {
584                    table_parts
585                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
586                }
587                table_parts.push(
588                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
589                );
590                builder.name(ast::ObjectName::from(table_parts));
591                relation.table(builder);
592
593                Ok(())
594            }
595            LogicalPlan::Projection(p) => {
596                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
597                    return self
598                        .select_to_sql_recursively(&new_plan, query, select, relation);
599                }
600
601                // Projection can be top-level plan for unnest relation.
602                // The projection generated by the `RecursiveUnnestRewriter`
603                // will have at least one expression referencing an unnest
604                // placeholder column.
605                let unnest_input_type: Option<UnnestInputType> =
606                    p.expr.iter().find_map(Self::find_unnest_placeholder);
607
608                // --- UNNEST table factor path (BigQuery, etc.) ---
609                // Only fires for a single bare-placeholder projection.
610                // Uses peel_to_unnest_with_modifiers (rather than matching
611                // p.input directly) to handle Limit/Sort between Projection
612                // and Unnest.
613                if self.dialect.unnest_as_table_factor()
614                    && p.expr.len() == 1
615                    && Self::is_bare_unnest_placeholder(&p.expr[0])
616                    && let Some((unnest, unnest_plan)) =
617                        self.peel_to_unnest_with_modifiers(p.input.as_ref(), query, None)?
618                    && let Some(unnest_relation) =
619                        self.try_unnest_to_table_factor_sql(unnest)?
620                {
621                    relation.unnest(unnest_relation);
622                    return self.select_to_sql_recursively(
623                        unnest_plan,
624                        query,
625                        select,
626                        relation,
627                    );
628                }
629
630                if self.try_projection_unnest_as_lateral_flatten(
631                    plan,
632                    p,
633                    query,
634                    select,
635                    relation,
636                    unnest_input_type.as_ref(),
637                )? {
638                    return Ok(());
639                }
640
641                // If it's a unnest projection, we should provide the table column alias
642                // to provide a column name for the unnest relation.
643                let columns = if unnest_input_type.is_some() {
644                    p.expr
645                        .iter()
646                        .map(|e| {
647                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
648                        })
649                        .collect()
650                } else {
651                    vec![]
652                };
653                // Projection can be top-level plan for derived table
654                if select.already_projected() {
655                    return self.derive_with_dialect_alias(
656                        "derived_projection",
657                        plan,
658                        relation,
659                        unnest_input_type
660                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
661                            .is_some(),
662                        columns,
663                    );
664                }
665                // For Snowflake FLATTEN: when the outer Projection has
666                // UNNEST(...) display-name columns (from SELECT * / SELECT
667                // UNNEST(...)), generate a flatten alias now so that
668                // reconstruct_select_statement and the downstream Unnest
669                // handler both use the same alias.
670                if self.dialect.unnest_as_lateral_flatten()
671                    && p.expr.iter().any(Self::has_internal_unnest_alias)
672                {
673                    select.next_flatten_alias();
674                }
675                // Pre-register FLATTEN table aliases from SubqueryAlias
676                // nodes in the plan tree so that
677                // reconstruct_select_statement can rewrite column
678                // references (e.g. a.col → a.VALUE) before the
679                // SubqueryAlias handler runs.
680                if self.dialect.unnest_as_lateral_flatten() {
681                    Self::collect_flatten_aliases(p.input.as_ref(), select);
682                }
683                self.reconstruct_select_statement(plan, p, select)?;
684                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
685            }
686            LogicalPlan::Filter(filter) => {
687                let window = find_window_nodes_within_select(
688                    plan,
689                    None,
690                    select.already_projected(),
691                );
692                let agg = find_agg_node_within_select(plan, select.already_projected());
693
694                if let (Some(window), true) =
695                    (window.as_deref(), self.dialect.supports_qualify())
696                {
697                    let mut unprojected =
698                        unproject_window_exprs(filter.predicate.clone(), window)?;
699                    if let Some(agg) = agg {
700                        unprojected = unproject_agg_exprs(unprojected, agg, None)?;
701                    }
702                    let filter_expr = self.expr_to_sql(&unprojected)?;
703                    select.qualify(Some(filter_expr));
704                } else if let Some(agg) = agg {
705                    let unprojected =
706                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
707                    let filter_expr = self.expr_to_sql(&unprojected)?;
708                    select.having(Some(filter_expr));
709                } else {
710                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
711                    select.selection(Some(filter_expr));
712                }
713
714                self.select_to_sql_recursively(
715                    filter.input.as_ref(),
716                    query,
717                    select,
718                    relation,
719                )
720            }
721            LogicalPlan::Limit(limit) => {
722                // Limit can be top-level plan for derived table
723                if select.already_projected() {
724                    return self.derive_with_dialect_alias(
725                        "derived_limit",
726                        plan,
727                        relation,
728                        false,
729                        vec![],
730                    );
731                }
732                if let Some(fetch) = &limit.fetch {
733                    let Some(query) = query.as_mut() else {
734                        return internal_err!(
735                            "Limit operator only valid in a statement context."
736                        );
737                    };
738                    query.limit(Some(self.expr_to_sql(fetch)?));
739                }
740
741                if let Some(skip) = &limit.skip {
742                    let Some(query) = query.as_mut() else {
743                        return internal_err!(
744                            "Offset operator only valid in a statement context."
745                        );
746                    };
747
748                    query.offset(Some(ast::Offset {
749                        rows: ast::OffsetRows::None,
750                        value: self.expr_to_sql(skip)?,
751                    }));
752                }
753
754                self.select_to_sql_recursively(
755                    limit.input.as_ref(),
756                    query,
757                    select,
758                    relation,
759                )
760            }
761            LogicalPlan::Sort(sort) => {
762                // Sort can be top-level plan for derived table
763                if select.already_projected() {
764                    return self.derive_with_dialect_alias(
765                        "derived_sort",
766                        plan,
767                        relation,
768                        false,
769                        vec![],
770                    );
771                }
772
773                let Some(query_ref) = query else {
774                    return internal_err!(
775                        "Sort operator only valid in a statement context."
776                    );
777                };
778
779                if let Some(fetch) = sort.fetch {
780                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
781                        fetch.to_string(),
782                        false,
783                    ))));
784                };
785
786                let agg = find_agg_node_within_select(plan, select.already_projected());
787                // unproject sort expressions
788                let sort_exprs: Vec<SortExpr> = sort
789                    .expr
790                    .iter()
791                    .map(|sort_expr| {
792                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
793                    })
794                    .collect::<Result<Vec<_>>>()?;
795
796                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
797
798                self.select_to_sql_recursively(
799                    sort.input.as_ref(),
800                    query,
801                    select,
802                    relation,
803                )
804            }
805            LogicalPlan::Aggregate(agg) => {
806                // Aggregation can be already handled in the projection case
807                if !select.already_projected() {
808                    // The query returns aggregate and group expressions. If that weren't the case,
809                    // the aggregate would have been placed inside a projection, making the check above^ false
810                    let exprs: Vec<_> = agg
811                        .aggr_expr
812                        .iter()
813                        .chain(agg.group_expr.iter())
814                        .map(|expr| self.select_item_to_sql(expr))
815                        .collect::<Result<Vec<_>>>()?;
816                    select.projection(exprs);
817
818                    select.group_by(ast::GroupByExpr::Expressions(
819                        agg.group_expr
820                            .iter()
821                            .map(|expr| self.expr_to_sql(expr))
822                            .collect::<Result<Vec<_>>>()?,
823                        vec![],
824                    ));
825                }
826
827                self.select_to_sql_recursively(
828                    agg.input.as_ref(),
829                    query,
830                    select,
831                    relation,
832                )
833            }
834            LogicalPlan::Distinct(distinct) => {
835                // Distinct can be top-level plan for derived table
836                if select.already_projected() {
837                    return self.derive_with_dialect_alias(
838                        "derived_distinct",
839                        plan,
840                        relation,
841                        false,
842                        vec![],
843                    );
844                }
845
846                // If this distinct is the parent of a Union and we're in a query context,
847                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
848                if let Distinct::All(input) = distinct
849                    && matches!(input.as_ref(), LogicalPlan::Union(_))
850                    && let Some(query_mut) = query.as_mut()
851                {
852                    query_mut.distinct_union();
853                    return self.select_to_sql_recursively(
854                        input.as_ref(),
855                        query,
856                        select,
857                        relation,
858                    );
859                }
860
861                let (select_distinct, input) = match distinct {
862                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
863                    Distinct::On(on) => {
864                        let exprs = on
865                            .on_expr
866                            .iter()
867                            .map(|e| self.expr_to_sql(e))
868                            .collect::<Result<Vec<_>>>()?;
869                        let items = on
870                            .select_expr
871                            .iter()
872                            .map(|e| self.select_item_to_sql(e))
873                            .collect::<Result<Vec<_>>>()?;
874                        if let Some(sort_expr) = &on.sort_expr {
875                            if let Some(query_ref) = query {
876                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
877                            } else {
878                                return internal_err!(
879                                    "Sort operator only valid in a statement context."
880                                );
881                            }
882                        }
883                        select.projection(items);
884                        (ast::Distinct::On(exprs), on.input.as_ref())
885                    }
886                };
887                select.distinct(Some(select_distinct));
888                self.select_to_sql_recursively(input, query, select, relation)
889            }
890            LogicalPlan::Join(join) => {
891                let mut table_scan_filters = vec![];
892                let (left_plan, right_plan) = match join.join_type {
893                    JoinType::RightSemi | JoinType::RightAnti => {
894                        (&join.right, &join.left)
895                    }
896                    _ => (&join.left, &join.right),
897                };
898                // If there's an outer projection plan, it will already set up the projection.
899                // In that case, we don't need to worry about setting up the projection here.
900                // The outer projection plan will handle projecting the correct columns.
901                let already_projected = select.already_projected();
902
903                let left_plan =
904                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
905                        Some((plan, filters)) => {
906                            table_scan_filters.extend(filters);
907                            Arc::new(plan)
908                        }
909                        None => Arc::clone(left_plan),
910                    };
911
912                self.select_to_sql_recursively(
913                    left_plan.as_ref(),
914                    query,
915                    select,
916                    relation,
917                )?;
918
919                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
920                {
921                    Some(select.pop_projections())
922                } else {
923                    None
924                };
925
926                let right_plan =
927                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
928                        Some((plan, filters)) => {
929                            table_scan_filters.extend(filters);
930                            Arc::new(plan)
931                        }
932                        None => Arc::clone(right_plan),
933                    };
934
935                let mut right_relation = RelationBuilder::default();
936
937                self.select_to_sql_recursively(
938                    right_plan.as_ref(),
939                    query,
940                    select,
941                    &mut right_relation,
942                )?;
943
944                let (join_filters, where_filters) = Self::split_join_on_and_where_filters(
945                    join.join_type,
946                    &join.filter,
947                    table_scan_filters,
948                );
949                for filter in where_filters {
950                    let filter_expr = self.expr_to_sql(&filter)?;
951                    select.selection(Some(filter_expr));
952                }
953
954                let join_constraint = self.join_constraint_to_sql(
955                    join.join_constraint,
956                    &join.on,
957                    join_filters.as_ref(),
958                )?;
959
960                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
961                {
962                    Some(select.pop_projections())
963                } else {
964                    None
965                };
966
967                match join.join_type {
968                    JoinType::LeftSemi
969                    | JoinType::LeftAnti
970                    | JoinType::LeftMark
971                    | JoinType::RightSemi
972                    | JoinType::RightAnti
973                    | JoinType::RightMark => {
974                        let mut query_builder = QueryBuilder::default();
975                        let mut from = TableWithJoinsBuilder::default();
976                        let mut exists_select: SelectBuilder = SelectBuilder::default();
977                        from.relation(right_relation);
978                        exists_select.push_from(from);
979                        if let Some(filter) = &join.filter {
980                            exists_select.selection(Some(self.expr_to_sql(filter)?));
981                        }
982                        for (left, right) in &join.on {
983                            exists_select.selection(Some(
984                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
985                            ));
986                        }
987                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
988                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
989                        )]);
990                        query_builder.body(Box::new(SetExpr::Select(Box::new(
991                            exists_select.build()?,
992                        ))));
993
994                        let negated = match join.join_type {
995                            JoinType::LeftSemi
996                            | JoinType::RightSemi
997                            | JoinType::LeftMark
998                            | JoinType::RightMark => false,
999                            JoinType::LeftAnti | JoinType::RightAnti => true,
1000                            _ => unreachable!(),
1001                        };
1002                        let exists_expr = ast::Expr::Exists {
1003                            subquery: Box::new(query_builder.build()?),
1004                            negated,
1005                        };
1006
1007                        match join.join_type {
1008                            JoinType::LeftMark | JoinType::RightMark => {
1009                                let source_schema =
1010                                    if join.join_type == JoinType::LeftMark {
1011                                        right_plan.schema()
1012                                    } else {
1013                                        left_plan.schema()
1014                                    };
1015                                let (table_ref, _) = source_schema.qualified_field(0);
1016                                let column = self.col_to_sql(&Column::new(
1017                                    table_ref.cloned(),
1018                                    "mark",
1019                                ))?;
1020                                select.replace_mark(&column, &exists_expr);
1021                            }
1022                            _ => {
1023                                select.selection(Some(exists_expr));
1024                            }
1025                        }
1026                        if let Some(projection) = left_projection {
1027                            select.projection(projection);
1028                        }
1029                    }
1030                    JoinType::Inner
1031                    | JoinType::Left
1032                    | JoinType::Right
1033                    | JoinType::Full => {
1034                        let Ok(Some(relation)) = right_relation.build() else {
1035                            return internal_err!("Failed to build right relation");
1036                        };
1037                        let ast_join = ast::Join {
1038                            relation,
1039                            global: false,
1040                            join_operator: self
1041                                .join_operator_to_sql(join.join_type, join_constraint)?,
1042                        };
1043                        let mut from = select.pop_from().unwrap();
1044                        from.push_join(ast_join);
1045                        select.push_from(from);
1046                        if !already_projected {
1047                            let Some(left_projection) = left_projection else {
1048                                return internal_err!("Left projection is missing");
1049                            };
1050
1051                            let Some(right_projection) = right_projection else {
1052                                return internal_err!("Right projection is missing");
1053                            };
1054
1055                            let projection = left_projection
1056                                .into_iter()
1057                                .chain(right_projection)
1058                                .collect();
1059                            select.projection(projection);
1060                        }
1061                    }
1062                };
1063
1064                Ok(())
1065            }
1066            LogicalPlan::SubqueryAlias(plan_alias) => {
1067                let (plan, mut columns) =
1068                    subquery_alias_inner_query_and_columns(plan_alias);
1069                let unparsed_table_scan = self.unparse_table_scan_pushdown(
1070                    plan,
1071                    Some(plan_alias.alias.clone()),
1072                    select.already_projected(),
1073                )?;
1074
1075                // If the (possibly rewritten) inner plan builds its own
1076                // SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds
1077                // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it,
1078                // we must emit a derived subquery: (SELECT ...) AS alias.
1079                // Without this, the recursive handler would merge those clauses
1080                // into the outer SELECT, losing the subquery structure entirely.
1081                if unparsed_table_scan.is_none() && Self::requires_derived_subquery(plan)
1082                {
1083                    // When the dialect does not support column aliases in
1084                    // table aliases (e.g. SQLite), inject the aliases into
1085                    // the inner projection before wrapping as a derived
1086                    // subquery.
1087                    if !columns.is_empty()
1088                        && !self.dialect.supports_column_alias_in_table_alias()
1089                    {
1090                        let Ok(rewritten_plan) =
1091                            inject_column_aliases_into_subquery(plan.clone(), columns)
1092                        else {
1093                            return internal_err!(
1094                                "Failed to transform SubqueryAlias plan"
1095                            );
1096                        };
1097                        return self.derive(
1098                            &rewritten_plan,
1099                            relation,
1100                            Some(self.new_table_alias(
1101                                plan_alias.alias.table().to_string(),
1102                                vec![],
1103                            )),
1104                            false,
1105                        );
1106                    }
1107                    return self.derive(
1108                        plan,
1109                        relation,
1110                        Some(self.new_table_alias(
1111                            plan_alias.alias.table().to_string(),
1112                            columns,
1113                        )),
1114                        false,
1115                    );
1116                }
1117
1118                // if the child plan is a TableScan with pushdown operations, we don't need to
1119                // create an additional subquery for it
1120                if !select.already_projected() && unparsed_table_scan.is_none() {
1121                    select.projection(vec![ast::SelectItem::Wildcard(
1122                        ast::WildcardAdditionalOptions::default(),
1123                    )]);
1124                }
1125                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
1126                if !columns.is_empty()
1127                    && !self.dialect.supports_column_alias_in_table_alias()
1128                {
1129                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
1130                    let rewritten_plan =
1131                        match inject_column_aliases_into_subquery(plan, columns) {
1132                            Ok(p) => p,
1133                            Err(e) => {
1134                                return internal_err!(
1135                                    "Failed to transform SubqueryAlias plan: {e}"
1136                                );
1137                            }
1138                        };
1139
1140                    columns = vec![];
1141
1142                    self.select_to_sql_recursively(
1143                        &rewritten_plan,
1144                        query,
1145                        select,
1146                        relation,
1147                    )?;
1148                } else {
1149                    self.select_to_sql_recursively(&plan, query, select, relation)?;
1150                }
1151
1152                relation.alias(Some(
1153                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
1154                ));
1155
1156                // If this SubqueryAlias wraps a FLATTEN (Snowflake unnest),
1157                // register the alias so the outer Projection can rewrite
1158                // column references to use VALUE.
1159                if self.dialect.unnest_as_lateral_flatten()
1160                    && find_unnest_node_until_relation(plan_alias.input.as_ref())
1161                        .is_some()
1162                {
1163                    select.add_flatten_table_alias(plan_alias.alias.table().to_string());
1164                }
1165
1166                Ok(())
1167            }
1168            LogicalPlan::Union(union) => {
1169                // Covers cases where the UNION is a subquery and the projection is at the top level
1170                if select.already_projected() {
1171                    return self.derive_with_dialect_alias(
1172                        "derived_union",
1173                        plan,
1174                        relation,
1175                        false,
1176                        vec![],
1177                    );
1178                }
1179
1180                let input_exprs: Vec<SetExpr> = union
1181                    .inputs
1182                    .iter()
1183                    .map(|input| self.select_to_sql_expr(input, query))
1184                    .collect::<Result<Vec<_>>>()?;
1185
1186                assert_or_internal_err!(
1187                    input_exprs.len() >= 2,
1188                    "UNION operator requires at least 2 inputs"
1189                );
1190
1191                let set_quantifier =
1192                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
1193                        // Setting the SetQuantifier to None will unparse as a `UNION`
1194                        // rather than a `UNION ALL`.
1195                        ast::SetQuantifier::None
1196                    } else {
1197                        ast::SetQuantifier::All
1198                    };
1199
1200                // Build the union expression tree bottom-up by reversing the order
1201                // note that we are also swapping left and right inputs because of the rev
1202                let union_expr = input_exprs
1203                    .into_iter()
1204                    .rev()
1205                    .reduce(|a, b| SetExpr::SetOperation {
1206                        op: ast::SetOperator::Union,
1207                        set_quantifier,
1208                        left: Box::new(b),
1209                        right: Box::new(a),
1210                    })
1211                    .unwrap();
1212
1213                let Some(query) = query.as_mut() else {
1214                    return internal_err!(
1215                        "UNION ALL operator only valid in a statement context"
1216                    );
1217                };
1218                query.body(Box::new(union_expr));
1219
1220                Ok(())
1221            }
1222            LogicalPlan::Window(window) => {
1223                // Window nodes are usually handled simultaneously with Projection
1224                // nodes, where projected columns are unprojected back into their
1225                // corresponding window expressions. Manually built plans can have
1226                // Window nodes without an enclosing Projection, so in that case
1227                // the Window node itself must contribute its output expressions.
1228                let project_window_output = !select.already_projected();
1229                if project_window_output
1230                    && Self::window_input_requires_derived_subquery(window.input.as_ref())
1231                {
1232                    return self
1233                        .window_to_sql_with_derived_input(window, select, relation);
1234                }
1235
1236                let agg = if project_window_output {
1237                    find_agg_node_within_select(plan, false)
1238                } else {
1239                    None
1240                };
1241
1242                self.select_to_sql_recursively(
1243                    window.input.as_ref(),
1244                    query,
1245                    select,
1246                    relation,
1247                )?;
1248
1249                if project_window_output {
1250                    self.project_window_output(&window.window_expr, select, agg)?;
1251                }
1252
1253                Ok(())
1254            }
1255            LogicalPlan::EmptyRelation(_) => {
1256                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
1257                // a TableRelationBuilder will be created for the UNNEST node first.
1258                if !relation.has_relation() {
1259                    relation.empty();
1260                }
1261                Ok(())
1262            }
1263            LogicalPlan::Extension(extension) => {
1264                if let Some(query) = query.as_mut() {
1265                    self.extension_to_sql(
1266                        extension.node.as_ref(),
1267                        &mut Some(query),
1268                        &mut Some(select),
1269                        &mut Some(relation),
1270                    )
1271                } else {
1272                    self.extension_to_sql(
1273                        extension.node.as_ref(),
1274                        &mut None,
1275                        &mut Some(select),
1276                        &mut Some(relation),
1277                    )
1278                }
1279            }
1280            LogicalPlan::Unnest(unnest) => {
1281                if !unnest.struct_type_columns.is_empty() {
1282                    if self.dialect.unnest_as_lateral_flatten() {
1283                        return not_impl_err!(
1284                            "Snowflake FLATTEN cannot unparse struct unnest: \
1285                             DataFusion expands struct fields into columns (horizontal), \
1286                             but Snowflake FLATTEN expands them into rows (vertical). \
1287                             Columns: {:?}",
1288                            unnest.struct_type_columns
1289                        );
1290                    }
1291                    return internal_err!(
1292                        "Struct type columns are not currently supported in UNNEST: {:?}",
1293                        unnest.struct_type_columns
1294                    );
1295                }
1296
1297                // For Snowflake FLATTEN: if the relation hasn't been set yet
1298                // (UNNEST was in SELECT clause, not FROM clause), set the FLATTEN
1299                // relation here so the FROM clause is emitted.
1300                if self.dialect.unnest_as_lateral_flatten()
1301                    && !relation.has_relation()
1302                    && let Some(mut flatten_relation) =
1303                        self.try_unnest_to_lateral_flatten_sql(unnest)?
1304                {
1305                    // Use the alias already generated by the Projection
1306                    // handler so SELECT items and the FLATTEN relation
1307                    // reference the same name.
1308                    if let Some(alias) = select.current_flatten_alias() {
1309                        flatten_relation.alias(Some(ast::TableAlias {
1310                            name: Ident::with_quote('"', &alias),
1311                            columns: vec![],
1312                            explicit: true,
1313                            at: None,
1314                        }));
1315                    }
1316                    relation.flatten(flatten_relation);
1317                }
1318
1319                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
1320                // Otherwise, there will be a duplicate SELECT clause.
1321                // | Projection: table.col1, UNNEST(table.col2)
1322                // |   Unnest: UNNEST(table.col2)
1323                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
1324                // |       Filter: table.col3 = Int64(3)
1325                // |         TableScan: table projection=None
1326                if let Some(p) = Self::peel_to_inner_projection(unnest.input.as_ref()) {
1327                    // Skip the inner Projection (synthetic rewriter node)
1328                    // and continue with its input.
1329                    self.select_to_sql_recursively(&p.input, query, select, relation)
1330                } else {
1331                    internal_err!("Unnest input is not a Projection: {unnest:?}")
1332                }
1333            }
1334            LogicalPlan::Subquery(subquery)
1335                if find_unnest_node_until_relation(subquery.subquery.as_ref())
1336                    .is_some() =>
1337            {
1338                if self.dialect.unnest_as_table_factor()
1339                    || self.dialect.unnest_as_lateral_flatten()
1340                {
1341                    self.select_to_sql_recursively(
1342                        subquery.subquery.as_ref(),
1343                        query,
1344                        select,
1345                        relation,
1346                    )
1347                } else {
1348                    self.derive_with_dialect_alias(
1349                        "derived_unnest",
1350                        subquery.subquery.as_ref(),
1351                        relation,
1352                        true,
1353                        vec![],
1354                    )
1355                }
1356            }
1357            _ => {
1358                not_impl_err!("Unsupported operator: {plan:?}")
1359            }
1360        }
1361    }
1362
1363    /// Walk through transparent nodes (SubqueryAlias) to find the inner
1364    /// Projection that feeds an Unnest node.
1365    ///
1366    /// The inner Projection is created atomically by the
1367    /// `RecursiveUnnestRewriter` and contains the array expression that the
1368    /// Unnest operates on. A `SubqueryAlias` (e.g. from a virtual/passthrough
1369    /// table) may wrap the Projection.
1370    fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> {
1371        match plan {
1372            LogicalPlan::Projection(p) => Some(p),
1373            LogicalPlan::SubqueryAlias(alias) => {
1374                Self::peel_to_inner_projection(alias.input.as_ref())
1375            }
1376            _ => None,
1377        }
1378    }
1379
1380    /// Walk through transparent nodes (Limit, Sort) between the outer
1381    /// Projection and the Unnest, applying their SQL modifiers (LIMIT,
1382    /// OFFSET, ORDER BY) to the query builder. Returns the `Unnest` node
1383    /// and a reference to the enclosing `LogicalPlan` for recursion, or
1384    /// `Ok(None)` if no Unnest is found.
1385    ///
1386    /// By processing Limit/Sort inline and then recursing into the Unnest
1387    /// plan directly, we bypass the normal Limit/Sort handlers which would
1388    /// create unwanted derived subqueries (since `already_projected` is
1389    /// set at the point this is called).
1390    fn peel_to_unnest_with_modifiers<'a>(
1391        &self,
1392        plan: &'a LogicalPlan,
1393        query: &mut Option<QueryBuilder>,
1394        flatten_alias: Option<&str>,
1395    ) -> Result<Option<(&'a Unnest, &'a LogicalPlan)>> {
1396        match plan {
1397            LogicalPlan::Unnest(unnest) => Ok(Some((unnest, plan))),
1398            LogicalPlan::Limit(limit) => {
1399                if let Some(fetch) = &limit.fetch
1400                    && let Some(q) = query.as_mut()
1401                {
1402                    q.limit(Some(self.expr_to_sql(fetch)?));
1403                }
1404                if let Some(skip) = &limit.skip
1405                    && let Some(q) = query.as_mut()
1406                {
1407                    q.offset(Some(ast::Offset {
1408                        rows: ast::OffsetRows::None,
1409                        value: self.expr_to_sql(skip)?,
1410                    }));
1411                }
1412                self.peel_to_unnest_with_modifiers(
1413                    limit.input.as_ref(),
1414                    query,
1415                    flatten_alias,
1416                )
1417            }
1418            LogicalPlan::Sort(sort) => {
1419                let Some(query_ref) = query.as_mut() else {
1420                    return internal_err!(
1421                        "Sort between Projection and Unnest requires a statement context."
1422                    );
1423                };
1424                if let Some(fetch) = sort.fetch {
1425                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
1426                        fetch.to_string(),
1427                        false,
1428                    ))));
1429                }
1430                // When a flatten_alias is provided, rewrite
1431                // __unnest_placeholder(...) columns in sort expressions to
1432                // alias.VALUE so ORDER BY references the FLATTEN output.
1433                let unnest_node = match sort.input.as_ref() {
1434                    LogicalPlan::Unnest(u) => Some(u),
1435                    _ => find_unnest_node_within_select(sort.input.as_ref()),
1436                };
1437                let sort_exprs = if let Some(alias) = flatten_alias
1438                    && let Some(unnest) = unnest_node
1439                {
1440                    sort.expr
1441                        .iter()
1442                        .map(|s| {
1443                            let rewritten = unproject_unnest_expr_as_flatten_value(
1444                                s.expr.clone(),
1445                                unnest,
1446                                alias,
1447                            )?;
1448                            Ok(SortExpr {
1449                                expr: rewritten,
1450                                ..s.clone()
1451                            })
1452                        })
1453                        .collect::<Result<Vec<_>>>()?
1454                } else {
1455                    sort.expr.clone()
1456                };
1457                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
1458                self.peel_to_unnest_with_modifiers(
1459                    sort.input.as_ref(),
1460                    query,
1461                    flatten_alias,
1462                )
1463            }
1464            _ => Ok(None),
1465        }
1466    }
1467
1468    /// Search an expression tree for an unnest placeholder column reference.
1469    ///
1470    /// Returns the [`UnnestInputType`] if any sub-expression is a column
1471    /// whose name starts with `__unnest_placeholder`. The placeholder may
1472    /// be at the top level (bare), inside a function call, or one of several
1473    /// expressions — this function finds it regardless.
1474    fn find_unnest_placeholder(expr: &Expr) -> Option<UnnestInputType> {
1475        let mut result = None;
1476        let _ = expr.apply(|e| {
1477            if let Some(t) = Self::classify_placeholder_column(e) {
1478                result = Some(t);
1479                return Ok(TreeNodeRecursion::Stop);
1480            }
1481            Ok(TreeNodeRecursion::Continue)
1482        });
1483        result
1484    }
1485
1486    /// Returns true if `expr` is a placeholder column, optionally wrapped
1487    /// in a single alias (the rewriter's internal `UNNEST(...)` name).
1488    /// Does NOT match when a user alias wraps the internal alias
1489    /// (e.g. `Alias("c1", Alias("UNNEST(...)", Column(placeholder)))`),
1490    /// so the table-factor path correctly falls through to
1491    /// `reconstruct_select_statement` which preserves user aliases.
1492    fn is_bare_unnest_placeholder(expr: &Expr) -> bool {
1493        // Peel at most one alias layer (the rewriter's internal name).
1494        let inner = match expr {
1495            Expr::Alias(Alias { expr, .. }) => expr.as_ref(),
1496            other => other,
1497        };
1498        Self::classify_placeholder_column(inner).is_some()
1499    }
1500
1501    /// If `expr` is a `Column` whose name starts with `__unnest_placeholder`,
1502    /// classify it as [`UnnestInputType::OuterReference`] or
1503    /// [`UnnestInputType::Scalar`].
1504    fn classify_placeholder_column(expr: &Expr) -> Option<UnnestInputType> {
1505        if let Expr::Column(Column { name, .. }) = expr
1506            && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1507        {
1508            if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1509                return Some(UnnestInputType::OuterReference);
1510            }
1511            return Some(UnnestInputType::Scalar);
1512        }
1513        None
1514    }
1515
1516    /// Check whether an expression carries an internal `UNNEST(...)` display
1517    /// name as its column name or outermost alias. After
1518    /// [`unproject_unnest_expr_as_flatten_value`] rewrites the placeholder
1519    /// column to `_unnest.VALUE`, the internal alias may still linger
1520    /// (e.g. `Alias("UNNEST(make_array(...))", Column("_unnest.VALUE"))`).
1521    /// Callers use this to replace the expression with a clean
1522    /// `_unnest."VALUE"` select item.
1523    fn has_internal_unnest_alias(expr: &Expr) -> bool {
1524        match expr {
1525            Expr::Column(col) => {
1526                col.name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
1527            }
1528            Expr::Alias(Alias { name, .. }) => {
1529                name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
1530            }
1531            _ => false,
1532        }
1533    }
1534
1535    /// Walk the plan tree and register any SubqueryAlias that wraps an
1536    /// unnest as a FLATTEN table alias on the SelectBuilder. This allows
1537    /// `reconstruct_select_statement` to rewrite column references (e.g.
1538    /// `a.col` → `a.VALUE`) before the SubqueryAlias handler runs.
1539    /// Returns true if a plan tree contains an Unnest node, searching
1540    /// through Projection, Subquery, and SubqueryAlias wrappers.
1541    fn contains_unnest(plan: &LogicalPlan) -> bool {
1542        match plan {
1543            LogicalPlan::Unnest(_) => true,
1544            LogicalPlan::Projection(p) => Self::contains_unnest(&p.input),
1545            LogicalPlan::Subquery(s) => Self::contains_unnest(&s.subquery),
1546            LogicalPlan::SubqueryAlias(a) => Self::contains_unnest(&a.input),
1547            _ => false,
1548        }
1549    }
1550
1551    fn collect_flatten_aliases(plan: &LogicalPlan, select: &mut SelectBuilder) {
1552        match plan {
1553            LogicalPlan::SubqueryAlias(alias)
1554                if Self::contains_unnest(alias.input.as_ref()) =>
1555            {
1556                select.add_flatten_table_alias(alias.alias.table().to_string());
1557            }
1558            LogicalPlan::Join(join) => {
1559                Self::collect_flatten_aliases(&join.left, select);
1560                Self::collect_flatten_aliases(&join.right, select);
1561            }
1562            _ => {}
1563        }
1564    }
1565
1566    fn try_unnest_to_table_factor_sql(
1567        &self,
1568        unnest: &Unnest,
1569    ) -> Result<Option<UnnestRelationBuilder>> {
1570        let mut unnest_relation = UnnestRelationBuilder::default();
1571        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1572            return Ok(None);
1573        };
1574
1575        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1576            // It may be possible that UNNEST is used as a source for the query.
1577            // However, at this point, we don't yet know if it is just a single expression
1578            // from another source or if it's from UNNEST.
1579            //
1580            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1581            // which is normally safe to unnest as a table factor.
1582            // However, in the future, more comprehensive checks can be added here.
1583            return Ok(None);
1584        };
1585
1586        let exprs = projection
1587            .expr
1588            .iter()
1589            .map(|e| self.expr_to_sql(e))
1590            .collect::<Result<Vec<_>>>()?;
1591        unnest_relation.array_exprs(exprs);
1592
1593        Ok(Some(unnest_relation))
1594    }
1595
1596    /// Build a `SELECT alias."VALUE"` item for Snowflake FLATTEN output.
1597    fn build_flatten_value_select_item(
1598        &self,
1599        flatten_alias: &str,
1600        user_alias: Option<&str>,
1601    ) -> ast::SelectItem {
1602        let compound = ast::Expr::CompoundIdentifier(vec![
1603            self.new_ident_quoted_if_needs(flatten_alias.to_string()),
1604            Ident::with_quote('"', "VALUE"),
1605        ]);
1606        match user_alias {
1607            Some(alias) => ast::SelectItem::ExprWithAlias {
1608                expr: compound,
1609                alias: self.new_ident_quoted_if_needs(alias.to_string()),
1610            },
1611            None => ast::SelectItem::UnnamedExpr(compound),
1612        }
1613    }
1614
1615    /// Convert an `Unnest` logical plan node to a `LATERAL FLATTEN(INPUT => expr, ...)`
1616    /// table factor for Snowflake-style SQL output.
1617    fn try_unnest_to_lateral_flatten_sql(
1618        &self,
1619        unnest: &Unnest,
1620    ) -> Result<Option<FlattenRelationBuilder>> {
1621        let Some(projection) = Self::peel_to_inner_projection(unnest.input.as_ref())
1622        else {
1623            return Ok(None);
1624        };
1625
1626        // For now, handle the simple case of a single expression to flatten.
1627        // Multi-expression would require multiple LATERAL FLATTEN calls chained together.
1628        let Some(first_expr) = projection.expr.first() else {
1629            return Ok(None);
1630        };
1631
1632        let input_expr = self.expr_to_sql(first_expr)?;
1633
1634        let mut flatten = FlattenRelationBuilder::default();
1635        flatten.input_expr(input_expr);
1636        flatten.outer(unnest.options.preserve_nulls);
1637
1638        Ok(Some(flatten))
1639    }
1640
1641    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1642        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1643    }
1644
1645    /// Returns true if a plan, when used as the direct child of a SubqueryAlias,
1646    /// must be emitted as a derived subquery `(SELECT ...) AS alias`.
1647    ///
1648    /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY,
1649    /// window functions).
1650    fn requires_derived_subquery(plan: &LogicalPlan) -> bool {
1651        matches!(
1652            plan,
1653            LogicalPlan::Aggregate(_)
1654                | LogicalPlan::Window(_)
1655                | LogicalPlan::Sort(_)
1656                | LogicalPlan::Limit(_)
1657                | LogicalPlan::Union(_)
1658        )
1659    }
1660
1661    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1662    /// If the table scan is without any pushdown operations, return None.
1663    fn unparse_table_scan_pushdown(
1664        &self,
1665        plan: &LogicalPlan,
1666        alias: Option<TableReference>,
1667        already_projected: bool,
1668    ) -> Result<Option<LogicalPlan>> {
1669        match plan {
1670            LogicalPlan::TableScan(table_scan) => {
1671                if !Self::is_scan_with_pushdown(table_scan) {
1672                    return Ok(None);
1673                }
1674                let table_schema = table_scan.source.schema();
1675                let mut filter_alias_rewriter =
1676                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1677                        table_schema: &table_schema,
1678                        alias_name: alias_name.clone(),
1679                    });
1680
1681                let mut builder = LogicalPlanBuilder::scan(
1682                    table_scan.table_name.clone(),
1683                    Arc::clone(&table_scan.source),
1684                    None,
1685                )?;
1686                // We will rebase the column references to the new alias if it exists.
1687                // If the projection or filters are empty, we will append alias to the table scan.
1688                //
1689                // Example:
1690                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1691                if let Some(ref alias) = alias
1692                    && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1693                {
1694                    builder = builder.alias(alias.clone())?;
1695                }
1696
1697                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1698                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1699                // information included in the TableScan node.
1700                if !already_projected && let Some(project_vec) = &table_scan.projection {
1701                    if project_vec.is_empty() {
1702                        builder = builder.project(self.empty_projection_fallback())?;
1703                    } else {
1704                        let project_columns = project_vec
1705                            .iter()
1706                            .cloned()
1707                            .map(|i| {
1708                                let schema = table_scan.source.schema();
1709                                let field = schema.field(i);
1710                                if alias.is_some() {
1711                                    Column::new(alias.clone(), field.name().clone())
1712                                } else {
1713                                    Column::new(
1714                                        Some(table_scan.table_name.clone()),
1715                                        field.name().clone(),
1716                                    )
1717                                }
1718                            })
1719                            .collect::<Vec<_>>();
1720                        builder = builder.project(project_columns)?;
1721                    };
1722                }
1723
1724                let filter_expr: Result<Option<Expr>> = table_scan
1725                    .filters
1726                    .iter()
1727                    .cloned()
1728                    .map(|expr| {
1729                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1730                            expr.rewrite(rewriter).data()
1731                        } else {
1732                            Ok(expr)
1733                        }
1734                    })
1735                    .reduce(|acc, expr_result| {
1736                        acc.and_then(|acc_expr| {
1737                            expr_result.map(|expr| acc_expr.and(expr))
1738                        })
1739                    })
1740                    .transpose();
1741
1742                if let Some(filter) = filter_expr? {
1743                    builder = builder.filter(filter)?;
1744                }
1745
1746                if let Some(fetch) = table_scan.fetch {
1747                    builder = builder.limit(0, Some(fetch))?;
1748                }
1749
1750                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1751                // So we will append the alias to this subquery.
1752                // Example:
1753                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1754                if let Some(alias) = alias
1755                    && table_scan.projection.is_none()
1756                    && table_scan.filters.is_empty()
1757                {
1758                    builder = builder.alias(alias)?;
1759                }
1760
1761                Ok(Some(builder.build()?))
1762            }
1763            LogicalPlan::SubqueryAlias(subquery_alias) => {
1764                let ret = self.unparse_table_scan_pushdown(
1765                    &subquery_alias.input,
1766                    Some(subquery_alias.alias.clone()),
1767                    already_projected,
1768                )?;
1769                if let Some(alias) = alias
1770                    && let Some(plan) = ret
1771                {
1772                    let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1773                    return Ok(Some(plan));
1774                }
1775                Ok(ret)
1776            }
1777            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1778            // The inner table scan could be a scan with pushdown operations.
1779            LogicalPlan::Projection(projection) => {
1780                if let Some(plan) = self.unparse_table_scan_pushdown(
1781                    &projection.input,
1782                    alias.clone(),
1783                    already_projected,
1784                )? {
1785                    let exprs = if alias.is_some() {
1786                        let mut alias_rewriter =
1787                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1788                                table_schema: plan.schema().as_arrow(),
1789                                alias_name: alias_name.clone(),
1790                            });
1791                        projection
1792                            .expr
1793                            .iter()
1794                            .cloned()
1795                            .map(|expr| {
1796                                if let Some(ref mut rewriter) = alias_rewriter {
1797                                    expr.rewrite(rewriter).data()
1798                                } else {
1799                                    Ok(expr)
1800                                }
1801                            })
1802                            .collect::<Result<Vec<_>>>()?
1803                    } else {
1804                        projection.expr.clone()
1805                    };
1806                    Ok(Some(
1807                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1808                    ))
1809                } else {
1810                    Ok(None)
1811                }
1812            }
1813            _ => Ok(None),
1814        }
1815    }
1816
1817    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1818        match expr {
1819            Expr::Alias(Alias { expr, name, .. }) => {
1820                let inner = self.expr_to_sql(expr)?;
1821
1822                // Determine the alias name to use
1823                let col_name = if let Some(rewritten_name) =
1824                    self.dialect.col_alias_overrides(name)?
1825                {
1826                    rewritten_name.to_string()
1827                } else {
1828                    name.to_string()
1829                };
1830
1831                Ok(ast::SelectItem::ExprWithAlias {
1832                    expr: inner,
1833                    alias: self.new_ident_quoted_if_needs(col_name),
1834                })
1835            }
1836            _ => {
1837                let inner = self.expr_to_sql(expr)?;
1838
1839                Ok(ast::SelectItem::UnnamedExpr(inner))
1840            }
1841        }
1842    }
1843
1844    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1845        Ok(OrderByKind::Expressions(
1846            sort_exprs
1847                .iter()
1848                .map(|sort_expr| self.sort_to_sql(sort_expr))
1849                .collect::<Result<Vec<_>>>()?,
1850        ))
1851    }
1852
1853    fn join_operator_to_sql(
1854        &self,
1855        join_type: JoinType,
1856        constraint: ast::JoinConstraint,
1857    ) -> Result<ast::JoinOperator> {
1858        Ok(match join_type {
1859            JoinType::Inner => match &constraint {
1860                ast::JoinConstraint::On(_)
1861                | ast::JoinConstraint::Using(_)
1862                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1863                ast::JoinConstraint::None => {
1864                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1865                    // return a CROSS JOIN instead
1866                    ast::JoinOperator::CrossJoin(constraint)
1867                }
1868            },
1869            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1870            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1871            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1872            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1873            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1874            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1875            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1876            JoinType::LeftMark | JoinType::RightMark => {
1877                unimplemented!("Unparsing of Mark join type")
1878            }
1879        })
1880    }
1881
1882    /// Convert the components of a USING clause to the USING AST. Returns
1883    /// 'None' if the conditions are not compatible with a USING expression,
1884    /// e.g. non-column expressions or non-matching names.
1885    fn join_using_to_sql(
1886        &self,
1887        join_conditions: &[(Expr, Expr)],
1888    ) -> Option<ast::JoinConstraint> {
1889        let mut object_names = Vec::with_capacity(join_conditions.len());
1890        for (left, right) in join_conditions {
1891            match (left, right) {
1892                (
1893                    Expr::Column(Column {
1894                        relation: _,
1895                        name: left_name,
1896                        spans: _,
1897                    }),
1898                    Expr::Column(Column {
1899                        relation: _,
1900                        name: right_name,
1901                        spans: _,
1902                    }),
1903                ) if left_name == right_name => {
1904                    // For example, if the join condition `t1.id = t2.id`
1905                    // this is represented as two columns like `[t1.id, t2.id]`
1906                    // This code forms `id` (without relation name)
1907                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1908                    object_names.push(ast::ObjectName::from(vec![ident]));
1909                }
1910                // USING is only valid with matching column names; arbitrary expressions
1911                // are not allowed
1912                _ => return None,
1913            }
1914        }
1915        Some(ast::JoinConstraint::Using(object_names))
1916    }
1917
1918    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1919    fn join_constraint_to_sql(
1920        &self,
1921        constraint: JoinConstraint,
1922        conditions: &[(Expr, Expr)],
1923        filter: Option<&Expr>,
1924    ) -> Result<ast::JoinConstraint> {
1925        match (constraint, conditions, filter) {
1926            // No constraints
1927            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1928                Ok(ast::JoinConstraint::None)
1929            }
1930
1931            (JoinConstraint::Using, conditions, None) => {
1932                match self.join_using_to_sql(conditions) {
1933                    Some(using) => Ok(using),
1934                    // As above, this should not be reachable from parsed SQL,
1935                    // but a user could create this; we "downgrade" to ON.
1936                    None => self.join_conditions_to_sql_on(conditions, None),
1937                }
1938            }
1939
1940            // Two cases here:
1941            // 1. Straightforward ON case, with possible equi-join conditions
1942            //    and additional filters
1943            // 2. USING with additional filters; we "downgrade" to ON, because
1944            //    you can't use USING with arbitrary filters. (This should not
1945            //    be accessible from parsed SQL, but may have been a
1946            //    custom-built JOIN by a user.)
1947            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1948                self.join_conditions_to_sql_on(conditions, filter)
1949            }
1950        }
1951    }
1952
1953    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1954    // AST node, with the equi-join conditions and the filter merged into a
1955    // single conditional expression
1956    fn join_conditions_to_sql_on(
1957        &self,
1958        join_conditions: &[(Expr, Expr)],
1959        filter: Option<&Expr>,
1960    ) -> Result<ast::JoinConstraint> {
1961        let mut condition = None;
1962        // AND the join conditions together to create the overall condition
1963        for (left, right) in join_conditions {
1964            // Parse left and right
1965            let l = self.expr_to_sql(left)?;
1966            let r = self.expr_to_sql(right)?;
1967            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1968            condition = match condition {
1969                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1970                None => Some(e),
1971            };
1972        }
1973
1974        // Then AND the non-equijoin filter condition as well
1975        condition = match (condition, filter) {
1976            (Some(expr), Some(filter)) => {
1977                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1978            }
1979            (Some(expr), None) => Some(expr),
1980            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1981            (None, None) => None,
1982        };
1983
1984        let constraint = match condition {
1985            Some(filter) => ast::JoinConstraint::On(filter),
1986            None => ast::JoinConstraint::None,
1987        };
1988
1989        Ok(constraint)
1990    }
1991
1992    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1993        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1994    }
1995
1996    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1997        let columns = columns
1998            .into_iter()
1999            .map(|ident| TableAliasColumnDef {
2000                name: ident,
2001                data_type: None,
2002            })
2003            .collect();
2004        ast::TableAlias {
2005            name: self.new_ident_quoted_if_needs(alias),
2006            columns,
2007            explicit: true,
2008            at: None,
2009        }
2010    }
2011
2012    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
2013        not_impl_err!("Unsupported plan: {plan:?}")
2014    }
2015
2016    /// Generates appropriate projection expression for empty projection lists.
2017    /// Returns an empty vec for dialects supporting empty select lists,
2018    /// or a dummy literal `1` for other dialects.
2019    fn empty_projection_fallback(&self) -> Vec<Expr> {
2020        if self.dialect.supports_empty_select_list() {
2021            Vec::new()
2022        } else {
2023            vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
2024        }
2025    }
2026
2027    /// Decides where extracted table-scan filters belong in the unparsed SQL:
2028    /// in the `JOIN ON` clause or in `WHERE`.
2029    ///
2030    /// For inner joins the two are semantically equivalent, so filters go to
2031    /// `WHERE` (some dialects reject subqueries inside `JOIN ON`).
2032    /// For outer joins the filters are AND-folded into `ON` to preserve correctness.
2033    ///
2034    /// Returns `(on_filter, where_filters)`.
2035    fn split_join_on_and_where_filters(
2036        join_type: JoinType,
2037        join_filter: &Option<Expr>,
2038        table_scan_filters: Vec<Expr>,
2039    ) -> (Option<Expr>, Vec<Expr>) {
2040        if table_scan_filters.is_empty() {
2041            return (join_filter.clone(), vec![]);
2042        }
2043
2044        if join_type == JoinType::Inner {
2045            // ON and WHERE are equivalent for inner joins; prefer WHERE
2046            // because some dialects reject subqueries inside JOIN ON.
2047            return (join_filter.clone(), table_scan_filters);
2048        }
2049
2050        // Outer joins: fold table-scan filters into ON to preserve semantics.
2051        let combined = table_scan_filters.into_iter().reduce(|acc, filter| {
2052            Expr::BinaryExpr(BinaryExpr {
2053                left: Box::new(acc),
2054                op: Operator::And,
2055                right: Box::new(filter),
2056            })
2057        });
2058
2059        let on_filter = match (join_filter, combined) {
2060            (Some(jf), Some(c)) => Some(Expr::BinaryExpr(BinaryExpr {
2061                left: Box::new(jf.clone()),
2062                op: Operator::And,
2063                right: Box::new(c),
2064            })),
2065            (Some(jf), None) => Some(jf.clone()),
2066            (None, Some(c)) => Some(c),
2067            (None, None) => None,
2068        };
2069
2070        (on_filter, vec![])
2071    }
2072}
2073
2074impl From<BuilderError> for DataFusionError {
2075    fn from(e: BuilderError) -> Self {
2076        DataFusionError::External(Box::new(e))
2077    }
2078}
2079
2080/// The type of the input to the UNNEST table factor.
2081#[derive(Debug)]
2082enum UnnestInputType {
2083    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
2084    OuterReference,
2085    /// The input is a scalar value. It will be presented like a scalar array or struct.
2086    Scalar,
2087}