1use super::{
19 Unparser,
20 ast::{
21 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23 },
24 rewrite::{
25 TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26 rewrite_plan_for_sort_on_non_projected_fields,
27 subquery_alias_inner_query_and_columns,
28 },
29 utils::{
30 find_agg_node_within_select, find_unnest_node_within_select,
31 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32 unproject_sort_expr, unproject_unnest_expr,
33 unproject_unnest_expr_as_flatten_value, unproject_window_exprs,
34 },
35};
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::unparser::{
41 ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, rewrite::rewrite_qualify,
42};
43use crate::utils::UNNEST_PLACEHOLDER;
44use datafusion_common::{
45 Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
46 internal_datafusion_err, internal_err, not_impl_err,
47 tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion},
48};
49use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX};
50use datafusion_expr::{
51 Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
52 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
53 UserDefinedLogicalNode, Window, expr::Alias,
54};
55use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
56use std::{sync::Arc, vec};
57
58pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
99 let unparser = Unparser::default();
100 unparser.plan_to_sql(plan)
101}
102
103impl Unparser<'_> {
104 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
105 let mut plan = normalize_union_schema(plan)?;
106 if !self.dialect.supports_qualify() {
107 plan = rewrite_qualify(plan)?;
108 }
109
110 match plan {
111 LogicalPlan::Projection(_)
112 | LogicalPlan::Filter(_)
113 | LogicalPlan::Window(_)
114 | LogicalPlan::Aggregate(_)
115 | LogicalPlan::Sort(_)
116 | LogicalPlan::Join(_)
117 | LogicalPlan::Repartition(_)
118 | LogicalPlan::Union(_)
119 | LogicalPlan::TableScan(_)
120 | LogicalPlan::EmptyRelation(_)
121 | LogicalPlan::Subquery(_)
122 | LogicalPlan::SubqueryAlias(_)
123 | LogicalPlan::Limit(_)
124 | LogicalPlan::Statement(_)
125 | LogicalPlan::Values(_)
126 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
127 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
128 LogicalPlan::Extension(extension) => {
129 self.extension_to_statement(extension.node.as_ref())
130 }
131 LogicalPlan::Explain(_)
132 | LogicalPlan::Analyze(_)
133 | LogicalPlan::Ddl(_)
134 | LogicalPlan::Copy(_)
135 | LogicalPlan::DescribeTable(_)
136 | LogicalPlan::RecursiveQuery(_)
137 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
138 }
139 }
140
141 fn extension_to_statement(
145 &self,
146 node: &dyn UserDefinedLogicalNode,
147 ) -> Result<ast::Statement> {
148 let mut statement = None;
149 for unparser in &self.extension_unparsers {
150 match unparser.unparse_to_statement(node, self)? {
151 UnparseToStatementResult::Modified(stmt) => {
152 statement = Some(stmt);
153 break;
154 }
155 UnparseToStatementResult::Unmodified => {}
156 }
157 }
158 if let Some(statement) = statement {
159 Ok(statement)
160 } else {
161 not_impl_err!("Unsupported extension node: {node:?}")
162 }
163 }
164
165 fn extension_to_sql(
169 &self,
170 node: &dyn UserDefinedLogicalNode,
171 query: &mut Option<&mut QueryBuilder>,
172 select: &mut Option<&mut SelectBuilder>,
173 relation: &mut Option<&mut RelationBuilder>,
174 ) -> Result<()> {
175 for unparser in &self.extension_unparsers {
176 match unparser.unparse(node, self, query, select, relation)? {
177 UnparseWithinStatementResult::Modified => return Ok(()),
178 UnparseWithinStatementResult::Unmodified => {}
179 }
180 }
181 not_impl_err!("Unsupported extension node: {node:?}")
182 }
183
184 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
185 let mut query_builder = Some(QueryBuilder::default());
186
187 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
188
189 let query = query_builder.unwrap().body(Box::new(body)).build()?;
190
191 Ok(ast::Statement::Query(Box::new(query)))
192 }
193
194 fn select_to_sql_expr(
195 &self,
196 plan: &LogicalPlan,
197 query: &mut Option<QueryBuilder>,
198 ) -> Result<SetExpr> {
199 let mut select_builder = SelectBuilder::default();
200 select_builder.push_from(TableWithJoinsBuilder::default());
201 let mut relation_builder = RelationBuilder::default();
202 self.select_to_sql_recursively(
203 plan,
204 query,
205 &mut select_builder,
206 &mut relation_builder,
207 )?;
208
209 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
211 return Ok(*body);
212 }
213
214 if !select_builder.already_projected() {
217 select_builder.projection(vec![ast::SelectItem::Wildcard(
218 ast::WildcardAdditionalOptions::default(),
219 )]);
220 }
221
222 let mut twj = select_builder.pop_from().unwrap();
223 twj.relation(relation_builder);
224 select_builder.push_from(twj);
225
226 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
227 }
228
229 fn reconstruct_select_statement(
233 &self,
234 plan: &LogicalPlan,
235 p: &Projection,
236 select: &mut SelectBuilder,
237 ) -> Result<()> {
238 let mut exprs = p.expr.clone();
239
240 let flatten_alias = select.current_flatten_alias();
242 if let Some(unnest) = find_unnest_node_within_select(plan) {
243 if let Some(ref alias) = flatten_alias {
244 exprs = exprs
245 .into_iter()
246 .map(|e| unproject_unnest_expr_as_flatten_value(e, unnest, alias))
247 .collect::<Result<Vec<_>>>()?;
248 } else {
249 exprs = exprs
250 .into_iter()
251 .map(|e| unproject_unnest_expr(e, unnest))
252 .collect::<Result<Vec<_>>>()?;
253 }
254 };
255
256 if !select.flatten_table_aliases_empty() {
260 exprs = exprs
261 .into_iter()
262 .map(|e| {
263 e.transform(|expr| {
264 if let Expr::Column(ref col) = expr
265 && let Some(ref relation) = col.relation
266 && select.is_flatten_table_alias(relation.table())
267 {
268 return Ok(Transformed::yes(Expr::Column(Column::new(
269 Some(relation.clone()),
270 "VALUE",
271 ))));
272 }
273 Ok(Transformed::no(expr))
274 })
275 .map(|t| t.data)
276 })
277 .collect::<Result<Vec<_>>>()?;
278 }
279
280 match (
281 find_agg_node_within_select(plan, true),
282 find_window_nodes_within_select(plan, None, true),
283 ) {
284 (Some(agg), window) => {
285 let window_option = window.as_deref();
286 let items = exprs
287 .into_iter()
288 .map(|proj_expr| {
289 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
290 self.select_item_to_sql(&unproj)
291 })
292 .collect::<Result<Vec<_>>>()?;
293
294 select.projection(items);
295 select.group_by(ast::GroupByExpr::Expressions(
296 agg.group_expr
297 .iter()
298 .map(|expr| self.expr_to_sql(expr))
299 .collect::<Result<Vec<_>>>()?,
300 vec![],
301 ));
302 }
303 (None, Some(window)) => {
304 let items = exprs
305 .into_iter()
306 .map(|proj_expr| {
307 let unproj = unproject_window_exprs(proj_expr, &window)?;
308 self.select_item_to_sql(&unproj)
309 })
310 .collect::<Result<Vec<_>>>()?;
311
312 select.projection(items);
313 }
314 _ => {
315 let items = exprs
316 .iter()
317 .map(|e| {
318 if let Some(ref alias) = flatten_alias
323 && Self::has_internal_unnest_alias(e)
324 {
325 return Ok(self.build_flatten_value_select_item(alias, None));
326 }
327 self.select_item_to_sql(e)
328 })
329 .collect::<Result<Vec<_>>>()?;
330 select.projection(items);
331 }
332 }
333 Ok(())
334 }
335
336 fn derive(
337 &self,
338 plan: &LogicalPlan,
339 relation: &mut RelationBuilder,
340 alias: Option<ast::TableAlias>,
341 lateral: bool,
342 ) -> Result<()> {
343 let mut derived_builder = DerivedRelationBuilder::default();
344 derived_builder.lateral(lateral).alias(alias).subquery({
345 let inner_statement = self.plan_to_sql(plan)?;
346 if let ast::Statement::Query(inner_query) = inner_statement {
347 inner_query
348 } else {
349 return internal_err!(
350 "Subquery must be a Query, but found {inner_statement:?}"
351 );
352 }
353 });
354 relation.derived(derived_builder);
355
356 Ok(())
357 }
358
359 fn derive_with_dialect_alias(
360 &self,
361 alias: &str,
362 plan: &LogicalPlan,
363 relation: &mut RelationBuilder,
364 lateral: bool,
365 columns: Vec<Ident>,
366 ) -> Result<()> {
367 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
368 self.derive(
369 plan,
370 relation,
371 Some(self.new_table_alias(alias.to_string(), columns)),
372 lateral,
373 )
374 } else {
375 self.derive(plan, relation, None, lateral)
376 }
377 }
378
379 fn try_projection_unnest_as_lateral_flatten(
395 &self,
396 plan: &LogicalPlan,
397 p: &Projection,
398 query: &mut Option<QueryBuilder>,
399 select: &mut SelectBuilder,
400 relation: &mut RelationBuilder,
401 unnest_input_type: Option<&UnnestInputType>,
402 ) -> Result<bool> {
403 if self.dialect.unnest_as_lateral_flatten() && unnest_input_type.is_some() {
408 let flatten_alias_name = if !select.already_projected() {
409 select.next_flatten_alias()
410 } else {
411 select
412 .current_flatten_alias()
413 .unwrap_or_else(|| select.next_flatten_alias())
414 };
415
416 if let Some((unnest, unnest_plan)) = self.peel_to_unnest_with_modifiers(
417 p.input.as_ref(),
418 query,
419 Some(&flatten_alias_name),
420 )? && let Some(mut flatten) =
421 self.try_unnest_to_lateral_flatten_sql(unnest)?
422 {
423 let inner_projection = Self::peel_to_inner_projection(
424 unnest.input.as_ref(),
425 )
426 .ok_or_else(|| {
427 internal_datafusion_err!(
428 "Unnest input is not a Projection: {:?}",
429 unnest.input
430 )
431 })?;
432
433 flatten.alias(Some(ast::TableAlias {
434 name: Ident::with_quote('"', &flatten_alias_name),
435 columns: vec![],
436 explicit: true,
437 at: None,
438 }));
439
440 if !select.already_projected() {
441 self.reconstruct_select_statement(plan, p, select)?;
442 }
443
444 if matches!(
445 inner_projection.input.as_ref(),
446 LogicalPlan::EmptyRelation(_)
447 ) {
448 relation.flatten(flatten);
449 self.select_to_sql_recursively(unnest_plan, query, select, relation)?;
450 return Ok(true);
451 }
452
453 self.select_to_sql_recursively(unnest_plan, query, select, relation)?;
454
455 let flatten_factor = flatten.build().map_err(|e| {
456 internal_datafusion_err!("Failed to build FLATTEN: {e}")
457 })?;
458 let cross_join = ast::Join {
459 relation: flatten_factor,
460 global: false,
461 join_operator: ast::JoinOperator::CrossJoin(
462 ast::JoinConstraint::None,
463 ),
464 };
465 if let Some(mut from) = select.pop_from() {
466 from.push_join(cross_join);
467 select.push_from(from);
468 } else {
469 let mut twj = TableWithJoinsBuilder::default();
470 twj.push_join(cross_join);
471 select.push_from(twj);
472 }
473
474 return Ok(true);
475 }
476 }
477
478 Ok(false)
479 }
480
481 fn project_window_output(
482 &self,
483 window_expr: &[Expr],
484 select: &mut SelectBuilder,
485 agg: Option<&Aggregate>,
486 ) -> Result<()> {
487 let mut items = if select.already_projected() {
488 select.pop_projections()
489 } else {
490 vec![ast::SelectItem::Wildcard(
491 ast::WildcardAdditionalOptions::default(),
492 )]
493 };
494
495 items.extend(
496 window_expr
497 .iter()
498 .map(|expr| {
499 let expr = if let Some(agg) = agg {
500 unproject_agg_exprs(expr.clone(), agg, None)?
501 } else {
502 expr.clone()
503 };
504 self.select_item_to_sql(&expr)
505 })
506 .collect::<Result<Vec<_>>>()?,
507 );
508 select.projection(items);
509
510 Ok(())
511 }
512
513 fn window_input_requires_derived_subquery(plan: &LogicalPlan) -> bool {
514 matches!(
518 plan,
519 LogicalPlan::Projection(_)
520 | LogicalPlan::Distinct(_)
521 | LogicalPlan::Limit(_)
522 | LogicalPlan::Sort(_)
523 | LogicalPlan::Union(_)
524 )
525 }
526
527 fn window_to_sql_with_derived_input(
528 &self,
529 window: &Window,
530 select: &mut SelectBuilder,
531 relation: &mut RelationBuilder,
532 ) -> Result<()> {
533 let input_alias = "derived_window_input";
534 self.derive(
535 window.input.as_ref(),
536 relation,
537 Some(self.new_table_alias(input_alias.to_string(), vec![])),
538 false,
539 )?;
540
541 let input_schema = window.input.schema();
542 let mut alias_rewriter = TableAliasRewriter {
543 table_schema: input_schema.as_arrow(),
544 alias_name: TableReference::bare(input_alias),
545 };
546 let window_expr = window
547 .window_expr
548 .iter()
549 .map(|expr| expr.clone().rewrite(&mut alias_rewriter).data())
550 .collect::<Result<Vec<_>>>()?;
551
552 self.project_window_output(&window_expr, select, None)
553 }
554
555 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
556 fn select_to_sql_recursively(
557 &self,
558 plan: &LogicalPlan,
559 query: &mut Option<QueryBuilder>,
560 select: &mut SelectBuilder,
561 relation: &mut RelationBuilder,
562 ) -> Result<()> {
563 match plan {
564 LogicalPlan::TableScan(scan) => {
565 if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
566 plan,
567 None,
568 select.already_projected(),
569 )? {
570 return self.select_to_sql_recursively(
571 &unparsed_table_scan,
572 query,
573 select,
574 relation,
575 );
576 }
577 let mut builder = TableRelationBuilder::default();
578 let mut table_parts = vec![];
579 if let Some(catalog_name) = scan.table_name.catalog() {
580 table_parts
581 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
582 }
583 if let Some(schema_name) = scan.table_name.schema() {
584 table_parts
585 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
586 }
587 table_parts.push(
588 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
589 );
590 builder.name(ast::ObjectName::from(table_parts));
591 relation.table(builder);
592
593 Ok(())
594 }
595 LogicalPlan::Projection(p) => {
596 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
597 return self
598 .select_to_sql_recursively(&new_plan, query, select, relation);
599 }
600
601 let unnest_input_type: Option<UnnestInputType> =
606 p.expr.iter().find_map(Self::find_unnest_placeholder);
607
608 if self.dialect.unnest_as_table_factor()
614 && p.expr.len() == 1
615 && Self::is_bare_unnest_placeholder(&p.expr[0])
616 && let Some((unnest, unnest_plan)) =
617 self.peel_to_unnest_with_modifiers(p.input.as_ref(), query, None)?
618 && let Some(unnest_relation) =
619 self.try_unnest_to_table_factor_sql(unnest)?
620 {
621 relation.unnest(unnest_relation);
622 return self.select_to_sql_recursively(
623 unnest_plan,
624 query,
625 select,
626 relation,
627 );
628 }
629
630 if self.try_projection_unnest_as_lateral_flatten(
631 plan,
632 p,
633 query,
634 select,
635 relation,
636 unnest_input_type.as_ref(),
637 )? {
638 return Ok(());
639 }
640
641 let columns = if unnest_input_type.is_some() {
644 p.expr
645 .iter()
646 .map(|e| {
647 self.new_ident_quoted_if_needs(e.schema_name().to_string())
648 })
649 .collect()
650 } else {
651 vec![]
652 };
653 if select.already_projected() {
655 return self.derive_with_dialect_alias(
656 "derived_projection",
657 plan,
658 relation,
659 unnest_input_type
660 .filter(|t| matches!(t, UnnestInputType::OuterReference))
661 .is_some(),
662 columns,
663 );
664 }
665 if self.dialect.unnest_as_lateral_flatten()
671 && p.expr.iter().any(Self::has_internal_unnest_alias)
672 {
673 select.next_flatten_alias();
674 }
675 if self.dialect.unnest_as_lateral_flatten() {
681 Self::collect_flatten_aliases(p.input.as_ref(), select);
682 }
683 self.reconstruct_select_statement(plan, p, select)?;
684 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
685 }
686 LogicalPlan::Filter(filter) => {
687 let window = find_window_nodes_within_select(
688 plan,
689 None,
690 select.already_projected(),
691 );
692 let agg = find_agg_node_within_select(plan, select.already_projected());
693
694 if let (Some(window), true) =
695 (window.as_deref(), self.dialect.supports_qualify())
696 {
697 let mut unprojected =
698 unproject_window_exprs(filter.predicate.clone(), window)?;
699 if let Some(agg) = agg {
700 unprojected = unproject_agg_exprs(unprojected, agg, None)?;
701 }
702 let filter_expr = self.expr_to_sql(&unprojected)?;
703 select.qualify(Some(filter_expr));
704 } else if let Some(agg) = agg {
705 let unprojected =
706 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
707 let filter_expr = self.expr_to_sql(&unprojected)?;
708 select.having(Some(filter_expr));
709 } else {
710 let filter_expr = self.expr_to_sql(&filter.predicate)?;
711 select.selection(Some(filter_expr));
712 }
713
714 self.select_to_sql_recursively(
715 filter.input.as_ref(),
716 query,
717 select,
718 relation,
719 )
720 }
721 LogicalPlan::Limit(limit) => {
722 if select.already_projected() {
724 return self.derive_with_dialect_alias(
725 "derived_limit",
726 plan,
727 relation,
728 false,
729 vec![],
730 );
731 }
732 if let Some(fetch) = &limit.fetch {
733 let Some(query) = query.as_mut() else {
734 return internal_err!(
735 "Limit operator only valid in a statement context."
736 );
737 };
738 query.limit(Some(self.expr_to_sql(fetch)?));
739 }
740
741 if let Some(skip) = &limit.skip {
742 let Some(query) = query.as_mut() else {
743 return internal_err!(
744 "Offset operator only valid in a statement context."
745 );
746 };
747
748 query.offset(Some(ast::Offset {
749 rows: ast::OffsetRows::None,
750 value: self.expr_to_sql(skip)?,
751 }));
752 }
753
754 self.select_to_sql_recursively(
755 limit.input.as_ref(),
756 query,
757 select,
758 relation,
759 )
760 }
761 LogicalPlan::Sort(sort) => {
762 if select.already_projected() {
764 return self.derive_with_dialect_alias(
765 "derived_sort",
766 plan,
767 relation,
768 false,
769 vec![],
770 );
771 }
772
773 let Some(query_ref) = query else {
774 return internal_err!(
775 "Sort operator only valid in a statement context."
776 );
777 };
778
779 if let Some(fetch) = sort.fetch {
780 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
781 fetch.to_string(),
782 false,
783 ))));
784 };
785
786 let agg = find_agg_node_within_select(plan, select.already_projected());
787 let sort_exprs: Vec<SortExpr> = sort
789 .expr
790 .iter()
791 .map(|sort_expr| {
792 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
793 })
794 .collect::<Result<Vec<_>>>()?;
795
796 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
797
798 self.select_to_sql_recursively(
799 sort.input.as_ref(),
800 query,
801 select,
802 relation,
803 )
804 }
805 LogicalPlan::Aggregate(agg) => {
806 if !select.already_projected() {
808 let exprs: Vec<_> = agg
811 .aggr_expr
812 .iter()
813 .chain(agg.group_expr.iter())
814 .map(|expr| self.select_item_to_sql(expr))
815 .collect::<Result<Vec<_>>>()?;
816 select.projection(exprs);
817
818 select.group_by(ast::GroupByExpr::Expressions(
819 agg.group_expr
820 .iter()
821 .map(|expr| self.expr_to_sql(expr))
822 .collect::<Result<Vec<_>>>()?,
823 vec![],
824 ));
825 }
826
827 self.select_to_sql_recursively(
828 agg.input.as_ref(),
829 query,
830 select,
831 relation,
832 )
833 }
834 LogicalPlan::Distinct(distinct) => {
835 if select.already_projected() {
837 return self.derive_with_dialect_alias(
838 "derived_distinct",
839 plan,
840 relation,
841 false,
842 vec![],
843 );
844 }
845
846 if let Distinct::All(input) = distinct
849 && matches!(input.as_ref(), LogicalPlan::Union(_))
850 && let Some(query_mut) = query.as_mut()
851 {
852 query_mut.distinct_union();
853 return self.select_to_sql_recursively(
854 input.as_ref(),
855 query,
856 select,
857 relation,
858 );
859 }
860
861 let (select_distinct, input) = match distinct {
862 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
863 Distinct::On(on) => {
864 let exprs = on
865 .on_expr
866 .iter()
867 .map(|e| self.expr_to_sql(e))
868 .collect::<Result<Vec<_>>>()?;
869 let items = on
870 .select_expr
871 .iter()
872 .map(|e| self.select_item_to_sql(e))
873 .collect::<Result<Vec<_>>>()?;
874 if let Some(sort_expr) = &on.sort_expr {
875 if let Some(query_ref) = query {
876 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
877 } else {
878 return internal_err!(
879 "Sort operator only valid in a statement context."
880 );
881 }
882 }
883 select.projection(items);
884 (ast::Distinct::On(exprs), on.input.as_ref())
885 }
886 };
887 select.distinct(Some(select_distinct));
888 self.select_to_sql_recursively(input, query, select, relation)
889 }
890 LogicalPlan::Join(join) => {
891 let mut table_scan_filters = vec![];
892 let (left_plan, right_plan) = match join.join_type {
893 JoinType::RightSemi | JoinType::RightAnti => {
894 (&join.right, &join.left)
895 }
896 _ => (&join.left, &join.right),
897 };
898 let already_projected = select.already_projected();
902
903 let left_plan =
904 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
905 Some((plan, filters)) => {
906 table_scan_filters.extend(filters);
907 Arc::new(plan)
908 }
909 None => Arc::clone(left_plan),
910 };
911
912 self.select_to_sql_recursively(
913 left_plan.as_ref(),
914 query,
915 select,
916 relation,
917 )?;
918
919 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
920 {
921 Some(select.pop_projections())
922 } else {
923 None
924 };
925
926 let right_plan =
927 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
928 Some((plan, filters)) => {
929 table_scan_filters.extend(filters);
930 Arc::new(plan)
931 }
932 None => Arc::clone(right_plan),
933 };
934
935 let mut right_relation = RelationBuilder::default();
936
937 self.select_to_sql_recursively(
938 right_plan.as_ref(),
939 query,
940 select,
941 &mut right_relation,
942 )?;
943
944 let (join_filters, where_filters) = Self::split_join_on_and_where_filters(
945 join.join_type,
946 &join.filter,
947 table_scan_filters,
948 );
949 for filter in where_filters {
950 let filter_expr = self.expr_to_sql(&filter)?;
951 select.selection(Some(filter_expr));
952 }
953
954 let join_constraint = self.join_constraint_to_sql(
955 join.join_constraint,
956 &join.on,
957 join_filters.as_ref(),
958 )?;
959
960 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
961 {
962 Some(select.pop_projections())
963 } else {
964 None
965 };
966
967 match join.join_type {
968 JoinType::LeftSemi
969 | JoinType::LeftAnti
970 | JoinType::LeftMark
971 | JoinType::RightSemi
972 | JoinType::RightAnti
973 | JoinType::RightMark => {
974 let mut query_builder = QueryBuilder::default();
975 let mut from = TableWithJoinsBuilder::default();
976 let mut exists_select: SelectBuilder = SelectBuilder::default();
977 from.relation(right_relation);
978 exists_select.push_from(from);
979 if let Some(filter) = &join.filter {
980 exists_select.selection(Some(self.expr_to_sql(filter)?));
981 }
982 for (left, right) in &join.on {
983 exists_select.selection(Some(
984 self.expr_to_sql(&left.clone().eq(right.clone()))?,
985 ));
986 }
987 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
988 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
989 )]);
990 query_builder.body(Box::new(SetExpr::Select(Box::new(
991 exists_select.build()?,
992 ))));
993
994 let negated = match join.join_type {
995 JoinType::LeftSemi
996 | JoinType::RightSemi
997 | JoinType::LeftMark
998 | JoinType::RightMark => false,
999 JoinType::LeftAnti | JoinType::RightAnti => true,
1000 _ => unreachable!(),
1001 };
1002 let exists_expr = ast::Expr::Exists {
1003 subquery: Box::new(query_builder.build()?),
1004 negated,
1005 };
1006
1007 match join.join_type {
1008 JoinType::LeftMark | JoinType::RightMark => {
1009 let source_schema =
1010 if join.join_type == JoinType::LeftMark {
1011 right_plan.schema()
1012 } else {
1013 left_plan.schema()
1014 };
1015 let (table_ref, _) = source_schema.qualified_field(0);
1016 let column = self.col_to_sql(&Column::new(
1017 table_ref.cloned(),
1018 "mark",
1019 ))?;
1020 select.replace_mark(&column, &exists_expr);
1021 }
1022 _ => {
1023 select.selection(Some(exists_expr));
1024 }
1025 }
1026 if let Some(projection) = left_projection {
1027 select.projection(projection);
1028 }
1029 }
1030 JoinType::Inner
1031 | JoinType::Left
1032 | JoinType::Right
1033 | JoinType::Full => {
1034 let Ok(Some(relation)) = right_relation.build() else {
1035 return internal_err!("Failed to build right relation");
1036 };
1037 let ast_join = ast::Join {
1038 relation,
1039 global: false,
1040 join_operator: self
1041 .join_operator_to_sql(join.join_type, join_constraint)?,
1042 };
1043 let mut from = select.pop_from().unwrap();
1044 from.push_join(ast_join);
1045 select.push_from(from);
1046 if !already_projected {
1047 let Some(left_projection) = left_projection else {
1048 return internal_err!("Left projection is missing");
1049 };
1050
1051 let Some(right_projection) = right_projection else {
1052 return internal_err!("Right projection is missing");
1053 };
1054
1055 let projection = left_projection
1056 .into_iter()
1057 .chain(right_projection)
1058 .collect();
1059 select.projection(projection);
1060 }
1061 }
1062 };
1063
1064 Ok(())
1065 }
1066 LogicalPlan::SubqueryAlias(plan_alias) => {
1067 let (plan, mut columns) =
1068 subquery_alias_inner_query_and_columns(plan_alias);
1069 let unparsed_table_scan = self.unparse_table_scan_pushdown(
1070 plan,
1071 Some(plan_alias.alias.clone()),
1072 select.already_projected(),
1073 )?;
1074
1075 if unparsed_table_scan.is_none() && Self::requires_derived_subquery(plan)
1082 {
1083 if !columns.is_empty()
1088 && !self.dialect.supports_column_alias_in_table_alias()
1089 {
1090 let Ok(rewritten_plan) =
1091 inject_column_aliases_into_subquery(plan.clone(), columns)
1092 else {
1093 return internal_err!(
1094 "Failed to transform SubqueryAlias plan"
1095 );
1096 };
1097 return self.derive(
1098 &rewritten_plan,
1099 relation,
1100 Some(self.new_table_alias(
1101 plan_alias.alias.table().to_string(),
1102 vec![],
1103 )),
1104 false,
1105 );
1106 }
1107 return self.derive(
1108 plan,
1109 relation,
1110 Some(self.new_table_alias(
1111 plan_alias.alias.table().to_string(),
1112 columns,
1113 )),
1114 false,
1115 );
1116 }
1117
1118 if !select.already_projected() && unparsed_table_scan.is_none() {
1121 select.projection(vec![ast::SelectItem::Wildcard(
1122 ast::WildcardAdditionalOptions::default(),
1123 )]);
1124 }
1125 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
1126 if !columns.is_empty()
1127 && !self.dialect.supports_column_alias_in_table_alias()
1128 {
1129 let rewritten_plan =
1131 match inject_column_aliases_into_subquery(plan, columns) {
1132 Ok(p) => p,
1133 Err(e) => {
1134 return internal_err!(
1135 "Failed to transform SubqueryAlias plan: {e}"
1136 );
1137 }
1138 };
1139
1140 columns = vec![];
1141
1142 self.select_to_sql_recursively(
1143 &rewritten_plan,
1144 query,
1145 select,
1146 relation,
1147 )?;
1148 } else {
1149 self.select_to_sql_recursively(&plan, query, select, relation)?;
1150 }
1151
1152 relation.alias(Some(
1153 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
1154 ));
1155
1156 if self.dialect.unnest_as_lateral_flatten()
1160 && find_unnest_node_until_relation(plan_alias.input.as_ref())
1161 .is_some()
1162 {
1163 select.add_flatten_table_alias(plan_alias.alias.table().to_string());
1164 }
1165
1166 Ok(())
1167 }
1168 LogicalPlan::Union(union) => {
1169 if select.already_projected() {
1171 return self.derive_with_dialect_alias(
1172 "derived_union",
1173 plan,
1174 relation,
1175 false,
1176 vec![],
1177 );
1178 }
1179
1180 let input_exprs: Vec<SetExpr> = union
1181 .inputs
1182 .iter()
1183 .map(|input| self.select_to_sql_expr(input, query))
1184 .collect::<Result<Vec<_>>>()?;
1185
1186 assert_or_internal_err!(
1187 input_exprs.len() >= 2,
1188 "UNION operator requires at least 2 inputs"
1189 );
1190
1191 let set_quantifier =
1192 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
1193 ast::SetQuantifier::None
1196 } else {
1197 ast::SetQuantifier::All
1198 };
1199
1200 let union_expr = input_exprs
1203 .into_iter()
1204 .rev()
1205 .reduce(|a, b| SetExpr::SetOperation {
1206 op: ast::SetOperator::Union,
1207 set_quantifier,
1208 left: Box::new(b),
1209 right: Box::new(a),
1210 })
1211 .unwrap();
1212
1213 let Some(query) = query.as_mut() else {
1214 return internal_err!(
1215 "UNION ALL operator only valid in a statement context"
1216 );
1217 };
1218 query.body(Box::new(union_expr));
1219
1220 Ok(())
1221 }
1222 LogicalPlan::Window(window) => {
1223 let project_window_output = !select.already_projected();
1229 if project_window_output
1230 && Self::window_input_requires_derived_subquery(window.input.as_ref())
1231 {
1232 return self
1233 .window_to_sql_with_derived_input(window, select, relation);
1234 }
1235
1236 let agg = if project_window_output {
1237 find_agg_node_within_select(plan, false)
1238 } else {
1239 None
1240 };
1241
1242 self.select_to_sql_recursively(
1243 window.input.as_ref(),
1244 query,
1245 select,
1246 relation,
1247 )?;
1248
1249 if project_window_output {
1250 self.project_window_output(&window.window_expr, select, agg)?;
1251 }
1252
1253 Ok(())
1254 }
1255 LogicalPlan::EmptyRelation(_) => {
1256 if !relation.has_relation() {
1259 relation.empty();
1260 }
1261 Ok(())
1262 }
1263 LogicalPlan::Extension(extension) => {
1264 if let Some(query) = query.as_mut() {
1265 self.extension_to_sql(
1266 extension.node.as_ref(),
1267 &mut Some(query),
1268 &mut Some(select),
1269 &mut Some(relation),
1270 )
1271 } else {
1272 self.extension_to_sql(
1273 extension.node.as_ref(),
1274 &mut None,
1275 &mut Some(select),
1276 &mut Some(relation),
1277 )
1278 }
1279 }
1280 LogicalPlan::Unnest(unnest) => {
1281 if !unnest.struct_type_columns.is_empty() {
1282 if self.dialect.unnest_as_lateral_flatten() {
1283 return not_impl_err!(
1284 "Snowflake FLATTEN cannot unparse struct unnest: \
1285 DataFusion expands struct fields into columns (horizontal), \
1286 but Snowflake FLATTEN expands them into rows (vertical). \
1287 Columns: {:?}",
1288 unnest.struct_type_columns
1289 );
1290 }
1291 return internal_err!(
1292 "Struct type columns are not currently supported in UNNEST: {:?}",
1293 unnest.struct_type_columns
1294 );
1295 }
1296
1297 if self.dialect.unnest_as_lateral_flatten()
1301 && !relation.has_relation()
1302 && let Some(mut flatten_relation) =
1303 self.try_unnest_to_lateral_flatten_sql(unnest)?
1304 {
1305 if let Some(alias) = select.current_flatten_alias() {
1309 flatten_relation.alias(Some(ast::TableAlias {
1310 name: Ident::with_quote('"', &alias),
1311 columns: vec![],
1312 explicit: true,
1313 at: None,
1314 }));
1315 }
1316 relation.flatten(flatten_relation);
1317 }
1318
1319 if let Some(p) = Self::peel_to_inner_projection(unnest.input.as_ref()) {
1327 self.select_to_sql_recursively(&p.input, query, select, relation)
1330 } else {
1331 internal_err!("Unnest input is not a Projection: {unnest:?}")
1332 }
1333 }
1334 LogicalPlan::Subquery(subquery)
1335 if find_unnest_node_until_relation(subquery.subquery.as_ref())
1336 .is_some() =>
1337 {
1338 if self.dialect.unnest_as_table_factor()
1339 || self.dialect.unnest_as_lateral_flatten()
1340 {
1341 self.select_to_sql_recursively(
1342 subquery.subquery.as_ref(),
1343 query,
1344 select,
1345 relation,
1346 )
1347 } else {
1348 self.derive_with_dialect_alias(
1349 "derived_unnest",
1350 subquery.subquery.as_ref(),
1351 relation,
1352 true,
1353 vec![],
1354 )
1355 }
1356 }
1357 _ => {
1358 not_impl_err!("Unsupported operator: {plan:?}")
1359 }
1360 }
1361 }
1362
1363 fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> {
1371 match plan {
1372 LogicalPlan::Projection(p) => Some(p),
1373 LogicalPlan::SubqueryAlias(alias) => {
1374 Self::peel_to_inner_projection(alias.input.as_ref())
1375 }
1376 _ => None,
1377 }
1378 }
1379
1380 fn peel_to_unnest_with_modifiers<'a>(
1391 &self,
1392 plan: &'a LogicalPlan,
1393 query: &mut Option<QueryBuilder>,
1394 flatten_alias: Option<&str>,
1395 ) -> Result<Option<(&'a Unnest, &'a LogicalPlan)>> {
1396 match plan {
1397 LogicalPlan::Unnest(unnest) => Ok(Some((unnest, plan))),
1398 LogicalPlan::Limit(limit) => {
1399 if let Some(fetch) = &limit.fetch
1400 && let Some(q) = query.as_mut()
1401 {
1402 q.limit(Some(self.expr_to_sql(fetch)?));
1403 }
1404 if let Some(skip) = &limit.skip
1405 && let Some(q) = query.as_mut()
1406 {
1407 q.offset(Some(ast::Offset {
1408 rows: ast::OffsetRows::None,
1409 value: self.expr_to_sql(skip)?,
1410 }));
1411 }
1412 self.peel_to_unnest_with_modifiers(
1413 limit.input.as_ref(),
1414 query,
1415 flatten_alias,
1416 )
1417 }
1418 LogicalPlan::Sort(sort) => {
1419 let Some(query_ref) = query.as_mut() else {
1420 return internal_err!(
1421 "Sort between Projection and Unnest requires a statement context."
1422 );
1423 };
1424 if let Some(fetch) = sort.fetch {
1425 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
1426 fetch.to_string(),
1427 false,
1428 ))));
1429 }
1430 let unnest_node = match sort.input.as_ref() {
1434 LogicalPlan::Unnest(u) => Some(u),
1435 _ => find_unnest_node_within_select(sort.input.as_ref()),
1436 };
1437 let sort_exprs = if let Some(alias) = flatten_alias
1438 && let Some(unnest) = unnest_node
1439 {
1440 sort.expr
1441 .iter()
1442 .map(|s| {
1443 let rewritten = unproject_unnest_expr_as_flatten_value(
1444 s.expr.clone(),
1445 unnest,
1446 alias,
1447 )?;
1448 Ok(SortExpr {
1449 expr: rewritten,
1450 ..s.clone()
1451 })
1452 })
1453 .collect::<Result<Vec<_>>>()?
1454 } else {
1455 sort.expr.clone()
1456 };
1457 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
1458 self.peel_to_unnest_with_modifiers(
1459 sort.input.as_ref(),
1460 query,
1461 flatten_alias,
1462 )
1463 }
1464 _ => Ok(None),
1465 }
1466 }
1467
1468 fn find_unnest_placeholder(expr: &Expr) -> Option<UnnestInputType> {
1475 let mut result = None;
1476 let _ = expr.apply(|e| {
1477 if let Some(t) = Self::classify_placeholder_column(e) {
1478 result = Some(t);
1479 return Ok(TreeNodeRecursion::Stop);
1480 }
1481 Ok(TreeNodeRecursion::Continue)
1482 });
1483 result
1484 }
1485
1486 fn is_bare_unnest_placeholder(expr: &Expr) -> bool {
1493 let inner = match expr {
1495 Expr::Alias(Alias { expr, .. }) => expr.as_ref(),
1496 other => other,
1497 };
1498 Self::classify_placeholder_column(inner).is_some()
1499 }
1500
1501 fn classify_placeholder_column(expr: &Expr) -> Option<UnnestInputType> {
1505 if let Expr::Column(Column { name, .. }) = expr
1506 && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1507 {
1508 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1509 return Some(UnnestInputType::OuterReference);
1510 }
1511 return Some(UnnestInputType::Scalar);
1512 }
1513 None
1514 }
1515
1516 fn has_internal_unnest_alias(expr: &Expr) -> bool {
1524 match expr {
1525 Expr::Column(col) => {
1526 col.name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
1527 }
1528 Expr::Alias(Alias { name, .. }) => {
1529 name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
1530 }
1531 _ => false,
1532 }
1533 }
1534
1535 fn contains_unnest(plan: &LogicalPlan) -> bool {
1542 match plan {
1543 LogicalPlan::Unnest(_) => true,
1544 LogicalPlan::Projection(p) => Self::contains_unnest(&p.input),
1545 LogicalPlan::Subquery(s) => Self::contains_unnest(&s.subquery),
1546 LogicalPlan::SubqueryAlias(a) => Self::contains_unnest(&a.input),
1547 _ => false,
1548 }
1549 }
1550
1551 fn collect_flatten_aliases(plan: &LogicalPlan, select: &mut SelectBuilder) {
1552 match plan {
1553 LogicalPlan::SubqueryAlias(alias)
1554 if Self::contains_unnest(alias.input.as_ref()) =>
1555 {
1556 select.add_flatten_table_alias(alias.alias.table().to_string());
1557 }
1558 LogicalPlan::Join(join) => {
1559 Self::collect_flatten_aliases(&join.left, select);
1560 Self::collect_flatten_aliases(&join.right, select);
1561 }
1562 _ => {}
1563 }
1564 }
1565
1566 fn try_unnest_to_table_factor_sql(
1567 &self,
1568 unnest: &Unnest,
1569 ) -> Result<Option<UnnestRelationBuilder>> {
1570 let mut unnest_relation = UnnestRelationBuilder::default();
1571 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1572 return Ok(None);
1573 };
1574
1575 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1576 return Ok(None);
1584 };
1585
1586 let exprs = projection
1587 .expr
1588 .iter()
1589 .map(|e| self.expr_to_sql(e))
1590 .collect::<Result<Vec<_>>>()?;
1591 unnest_relation.array_exprs(exprs);
1592
1593 Ok(Some(unnest_relation))
1594 }
1595
1596 fn build_flatten_value_select_item(
1598 &self,
1599 flatten_alias: &str,
1600 user_alias: Option<&str>,
1601 ) -> ast::SelectItem {
1602 let compound = ast::Expr::CompoundIdentifier(vec![
1603 self.new_ident_quoted_if_needs(flatten_alias.to_string()),
1604 Ident::with_quote('"', "VALUE"),
1605 ]);
1606 match user_alias {
1607 Some(alias) => ast::SelectItem::ExprWithAlias {
1608 expr: compound,
1609 alias: self.new_ident_quoted_if_needs(alias.to_string()),
1610 },
1611 None => ast::SelectItem::UnnamedExpr(compound),
1612 }
1613 }
1614
1615 fn try_unnest_to_lateral_flatten_sql(
1618 &self,
1619 unnest: &Unnest,
1620 ) -> Result<Option<FlattenRelationBuilder>> {
1621 let Some(projection) = Self::peel_to_inner_projection(unnest.input.as_ref())
1622 else {
1623 return Ok(None);
1624 };
1625
1626 let Some(first_expr) = projection.expr.first() else {
1629 return Ok(None);
1630 };
1631
1632 let input_expr = self.expr_to_sql(first_expr)?;
1633
1634 let mut flatten = FlattenRelationBuilder::default();
1635 flatten.input_expr(input_expr);
1636 flatten.outer(unnest.options.preserve_nulls);
1637
1638 Ok(Some(flatten))
1639 }
1640
1641 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1642 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1643 }
1644
1645 fn requires_derived_subquery(plan: &LogicalPlan) -> bool {
1651 matches!(
1652 plan,
1653 LogicalPlan::Aggregate(_)
1654 | LogicalPlan::Window(_)
1655 | LogicalPlan::Sort(_)
1656 | LogicalPlan::Limit(_)
1657 | LogicalPlan::Union(_)
1658 )
1659 }
1660
1661 fn unparse_table_scan_pushdown(
1664 &self,
1665 plan: &LogicalPlan,
1666 alias: Option<TableReference>,
1667 already_projected: bool,
1668 ) -> Result<Option<LogicalPlan>> {
1669 match plan {
1670 LogicalPlan::TableScan(table_scan) => {
1671 if !Self::is_scan_with_pushdown(table_scan) {
1672 return Ok(None);
1673 }
1674 let table_schema = table_scan.source.schema();
1675 let mut filter_alias_rewriter =
1676 alias.as_ref().map(|alias_name| TableAliasRewriter {
1677 table_schema: &table_schema,
1678 alias_name: alias_name.clone(),
1679 });
1680
1681 let mut builder = LogicalPlanBuilder::scan(
1682 table_scan.table_name.clone(),
1683 Arc::clone(&table_scan.source),
1684 None,
1685 )?;
1686 if let Some(ref alias) = alias
1692 && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1693 {
1694 builder = builder.alias(alias.clone())?;
1695 }
1696
1697 if !already_projected && let Some(project_vec) = &table_scan.projection {
1701 if project_vec.is_empty() {
1702 builder = builder.project(self.empty_projection_fallback())?;
1703 } else {
1704 let project_columns = project_vec
1705 .iter()
1706 .cloned()
1707 .map(|i| {
1708 let schema = table_scan.source.schema();
1709 let field = schema.field(i);
1710 if alias.is_some() {
1711 Column::new(alias.clone(), field.name().clone())
1712 } else {
1713 Column::new(
1714 Some(table_scan.table_name.clone()),
1715 field.name().clone(),
1716 )
1717 }
1718 })
1719 .collect::<Vec<_>>();
1720 builder = builder.project(project_columns)?;
1721 };
1722 }
1723
1724 let filter_expr: Result<Option<Expr>> = table_scan
1725 .filters
1726 .iter()
1727 .cloned()
1728 .map(|expr| {
1729 if let Some(ref mut rewriter) = filter_alias_rewriter {
1730 expr.rewrite(rewriter).data()
1731 } else {
1732 Ok(expr)
1733 }
1734 })
1735 .reduce(|acc, expr_result| {
1736 acc.and_then(|acc_expr| {
1737 expr_result.map(|expr| acc_expr.and(expr))
1738 })
1739 })
1740 .transpose();
1741
1742 if let Some(filter) = filter_expr? {
1743 builder = builder.filter(filter)?;
1744 }
1745
1746 if let Some(fetch) = table_scan.fetch {
1747 builder = builder.limit(0, Some(fetch))?;
1748 }
1749
1750 if let Some(alias) = alias
1755 && table_scan.projection.is_none()
1756 && table_scan.filters.is_empty()
1757 {
1758 builder = builder.alias(alias)?;
1759 }
1760
1761 Ok(Some(builder.build()?))
1762 }
1763 LogicalPlan::SubqueryAlias(subquery_alias) => {
1764 let ret = self.unparse_table_scan_pushdown(
1765 &subquery_alias.input,
1766 Some(subquery_alias.alias.clone()),
1767 already_projected,
1768 )?;
1769 if let Some(alias) = alias
1770 && let Some(plan) = ret
1771 {
1772 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1773 return Ok(Some(plan));
1774 }
1775 Ok(ret)
1776 }
1777 LogicalPlan::Projection(projection) => {
1780 if let Some(plan) = self.unparse_table_scan_pushdown(
1781 &projection.input,
1782 alias.clone(),
1783 already_projected,
1784 )? {
1785 let exprs = if alias.is_some() {
1786 let mut alias_rewriter =
1787 alias.as_ref().map(|alias_name| TableAliasRewriter {
1788 table_schema: plan.schema().as_arrow(),
1789 alias_name: alias_name.clone(),
1790 });
1791 projection
1792 .expr
1793 .iter()
1794 .cloned()
1795 .map(|expr| {
1796 if let Some(ref mut rewriter) = alias_rewriter {
1797 expr.rewrite(rewriter).data()
1798 } else {
1799 Ok(expr)
1800 }
1801 })
1802 .collect::<Result<Vec<_>>>()?
1803 } else {
1804 projection.expr.clone()
1805 };
1806 Ok(Some(
1807 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1808 ))
1809 } else {
1810 Ok(None)
1811 }
1812 }
1813 _ => Ok(None),
1814 }
1815 }
1816
1817 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1818 match expr {
1819 Expr::Alias(Alias { expr, name, .. }) => {
1820 let inner = self.expr_to_sql(expr)?;
1821
1822 let col_name = if let Some(rewritten_name) =
1824 self.dialect.col_alias_overrides(name)?
1825 {
1826 rewritten_name.to_string()
1827 } else {
1828 name.to_string()
1829 };
1830
1831 Ok(ast::SelectItem::ExprWithAlias {
1832 expr: inner,
1833 alias: self.new_ident_quoted_if_needs(col_name),
1834 })
1835 }
1836 _ => {
1837 let inner = self.expr_to_sql(expr)?;
1838
1839 Ok(ast::SelectItem::UnnamedExpr(inner))
1840 }
1841 }
1842 }
1843
1844 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1845 Ok(OrderByKind::Expressions(
1846 sort_exprs
1847 .iter()
1848 .map(|sort_expr| self.sort_to_sql(sort_expr))
1849 .collect::<Result<Vec<_>>>()?,
1850 ))
1851 }
1852
1853 fn join_operator_to_sql(
1854 &self,
1855 join_type: JoinType,
1856 constraint: ast::JoinConstraint,
1857 ) -> Result<ast::JoinOperator> {
1858 Ok(match join_type {
1859 JoinType::Inner => match &constraint {
1860 ast::JoinConstraint::On(_)
1861 | ast::JoinConstraint::Using(_)
1862 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1863 ast::JoinConstraint::None => {
1864 ast::JoinOperator::CrossJoin(constraint)
1867 }
1868 },
1869 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1870 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1871 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1872 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1873 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1874 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1875 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1876 JoinType::LeftMark | JoinType::RightMark => {
1877 unimplemented!("Unparsing of Mark join type")
1878 }
1879 })
1880 }
1881
1882 fn join_using_to_sql(
1886 &self,
1887 join_conditions: &[(Expr, Expr)],
1888 ) -> Option<ast::JoinConstraint> {
1889 let mut object_names = Vec::with_capacity(join_conditions.len());
1890 for (left, right) in join_conditions {
1891 match (left, right) {
1892 (
1893 Expr::Column(Column {
1894 relation: _,
1895 name: left_name,
1896 spans: _,
1897 }),
1898 Expr::Column(Column {
1899 relation: _,
1900 name: right_name,
1901 spans: _,
1902 }),
1903 ) if left_name == right_name => {
1904 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1908 object_names.push(ast::ObjectName::from(vec![ident]));
1909 }
1910 _ => return None,
1913 }
1914 }
1915 Some(ast::JoinConstraint::Using(object_names))
1916 }
1917
1918 fn join_constraint_to_sql(
1920 &self,
1921 constraint: JoinConstraint,
1922 conditions: &[(Expr, Expr)],
1923 filter: Option<&Expr>,
1924 ) -> Result<ast::JoinConstraint> {
1925 match (constraint, conditions, filter) {
1926 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1928 Ok(ast::JoinConstraint::None)
1929 }
1930
1931 (JoinConstraint::Using, conditions, None) => {
1932 match self.join_using_to_sql(conditions) {
1933 Some(using) => Ok(using),
1934 None => self.join_conditions_to_sql_on(conditions, None),
1937 }
1938 }
1939
1940 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1948 self.join_conditions_to_sql_on(conditions, filter)
1949 }
1950 }
1951 }
1952
1953 fn join_conditions_to_sql_on(
1957 &self,
1958 join_conditions: &[(Expr, Expr)],
1959 filter: Option<&Expr>,
1960 ) -> Result<ast::JoinConstraint> {
1961 let mut condition = None;
1962 for (left, right) in join_conditions {
1964 let l = self.expr_to_sql(left)?;
1966 let r = self.expr_to_sql(right)?;
1967 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1968 condition = match condition {
1969 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1970 None => Some(e),
1971 };
1972 }
1973
1974 condition = match (condition, filter) {
1976 (Some(expr), Some(filter)) => {
1977 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1978 }
1979 (Some(expr), None) => Some(expr),
1980 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1981 (None, None) => None,
1982 };
1983
1984 let constraint = match condition {
1985 Some(filter) => ast::JoinConstraint::On(filter),
1986 None => ast::JoinConstraint::None,
1987 };
1988
1989 Ok(constraint)
1990 }
1991
1992 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1993 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1994 }
1995
1996 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1997 let columns = columns
1998 .into_iter()
1999 .map(|ident| TableAliasColumnDef {
2000 name: ident,
2001 data_type: None,
2002 })
2003 .collect();
2004 ast::TableAlias {
2005 name: self.new_ident_quoted_if_needs(alias),
2006 columns,
2007 explicit: true,
2008 at: None,
2009 }
2010 }
2011
2012 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
2013 not_impl_err!("Unsupported plan: {plan:?}")
2014 }
2015
2016 fn empty_projection_fallback(&self) -> Vec<Expr> {
2020 if self.dialect.supports_empty_select_list() {
2021 Vec::new()
2022 } else {
2023 vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
2024 }
2025 }
2026
2027 fn split_join_on_and_where_filters(
2036 join_type: JoinType,
2037 join_filter: &Option<Expr>,
2038 table_scan_filters: Vec<Expr>,
2039 ) -> (Option<Expr>, Vec<Expr>) {
2040 if table_scan_filters.is_empty() {
2041 return (join_filter.clone(), vec![]);
2042 }
2043
2044 if join_type == JoinType::Inner {
2045 return (join_filter.clone(), table_scan_filters);
2048 }
2049
2050 let combined = table_scan_filters.into_iter().reduce(|acc, filter| {
2052 Expr::BinaryExpr(BinaryExpr {
2053 left: Box::new(acc),
2054 op: Operator::And,
2055 right: Box::new(filter),
2056 })
2057 });
2058
2059 let on_filter = match (join_filter, combined) {
2060 (Some(jf), Some(c)) => Some(Expr::BinaryExpr(BinaryExpr {
2061 left: Box::new(jf.clone()),
2062 op: Operator::And,
2063 right: Box::new(c),
2064 })),
2065 (Some(jf), None) => Some(jf.clone()),
2066 (None, Some(c)) => Some(c),
2067 (None, None) => None,
2068 };
2069
2070 (on_filter, vec![])
2071 }
2072}
2073
2074impl From<BuilderError> for DataFusionError {
2075 fn from(e: BuilderError) -> Self {
2076 DataFusionError::External(Box::new(e))
2077 }
2078}
2079
2080#[derive(Debug)]
2082enum UnnestInputType {
2083 OuterReference,
2085 Scalar,
2087}