1use super::{
19 Unparser,
20 ast::{
21 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23 },
24 rewrite::{
25 TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26 rewrite_plan_for_sort_on_non_projected_fields,
27 subquery_alias_inner_query_and_columns,
28 },
29 utils::{
30 find_agg_node_within_select, find_unnest_node_within_select,
31 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
33 },
34};
35use crate::unparser::extension_unparser::{
36 UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
43 internal_err, not_impl_err,
44 tree_node::{TransformedResult, TreeNode},
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode, expr::Alias,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96 let unparser = Unparser::default();
97 unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102 let mut plan = normalize_union_schema(plan)?;
103 if !self.dialect.supports_qualify() {
104 plan = rewrite_qualify(plan)?;
105 }
106
107 match plan {
108 LogicalPlan::Projection(_)
109 | LogicalPlan::Filter(_)
110 | LogicalPlan::Window(_)
111 | LogicalPlan::Aggregate(_)
112 | LogicalPlan::Sort(_)
113 | LogicalPlan::Join(_)
114 | LogicalPlan::Repartition(_)
115 | LogicalPlan::Union(_)
116 | LogicalPlan::TableScan(_)
117 | LogicalPlan::EmptyRelation(_)
118 | LogicalPlan::Subquery(_)
119 | LogicalPlan::SubqueryAlias(_)
120 | LogicalPlan::Limit(_)
121 | LogicalPlan::Statement(_)
122 | LogicalPlan::Values(_)
123 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125 LogicalPlan::Extension(extension) => {
126 self.extension_to_statement(extension.node.as_ref())
127 }
128 LogicalPlan::Explain(_)
129 | LogicalPlan::Analyze(_)
130 | LogicalPlan::Ddl(_)
131 | LogicalPlan::Copy(_)
132 | LogicalPlan::DescribeTable(_)
133 | LogicalPlan::RecursiveQuery(_)
134 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135 }
136 }
137
138 fn extension_to_statement(
142 &self,
143 node: &dyn UserDefinedLogicalNode,
144 ) -> Result<ast::Statement> {
145 let mut statement = None;
146 for unparser in &self.extension_unparsers {
147 match unparser.unparse_to_statement(node, self)? {
148 UnparseToStatementResult::Modified(stmt) => {
149 statement = Some(stmt);
150 break;
151 }
152 UnparseToStatementResult::Unmodified => {}
153 }
154 }
155 if let Some(statement) = statement {
156 Ok(statement)
157 } else {
158 not_impl_err!("Unsupported extension node: {node:?}")
159 }
160 }
161
162 fn extension_to_sql(
166 &self,
167 node: &dyn UserDefinedLogicalNode,
168 query: &mut Option<&mut QueryBuilder>,
169 select: &mut Option<&mut SelectBuilder>,
170 relation: &mut Option<&mut RelationBuilder>,
171 ) -> Result<()> {
172 for unparser in &self.extension_unparsers {
173 match unparser.unparse(node, self, query, select, relation)? {
174 UnparseWithinStatementResult::Modified => return Ok(()),
175 UnparseWithinStatementResult::Unmodified => {}
176 }
177 }
178 not_impl_err!("Unsupported extension node: {node:?}")
179 }
180
181 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182 let mut query_builder = Some(QueryBuilder::default());
183
184 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186 let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188 Ok(ast::Statement::Query(Box::new(query)))
189 }
190
191 fn select_to_sql_expr(
192 &self,
193 plan: &LogicalPlan,
194 query: &mut Option<QueryBuilder>,
195 ) -> Result<SetExpr> {
196 let mut select_builder = SelectBuilder::default();
197 select_builder.push_from(TableWithJoinsBuilder::default());
198 let mut relation_builder = RelationBuilder::default();
199 self.select_to_sql_recursively(
200 plan,
201 query,
202 &mut select_builder,
203 &mut relation_builder,
204 )?;
205
206 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208 return Ok(*body);
209 }
210
211 if !select_builder.already_projected() {
214 select_builder.projection(vec![ast::SelectItem::Wildcard(
215 ast::WildcardAdditionalOptions::default(),
216 )]);
217 }
218
219 let mut twj = select_builder.pop_from().unwrap();
220 twj.relation(relation_builder);
221 select_builder.push_from(twj);
222
223 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224 }
225
226 fn reconstruct_select_statement(
230 &self,
231 plan: &LogicalPlan,
232 p: &Projection,
233 select: &mut SelectBuilder,
234 ) -> Result<()> {
235 let mut exprs = p.expr.clone();
236
237 if let Some(unnest) = find_unnest_node_within_select(plan) {
239 exprs = exprs
240 .into_iter()
241 .map(|e| unproject_unnest_expr(e, unnest))
242 .collect::<Result<Vec<_>>>()?;
243 };
244
245 match (
246 find_agg_node_within_select(plan, true),
247 find_window_nodes_within_select(plan, None, true),
248 ) {
249 (Some(agg), window) => {
250 let window_option = window.as_deref();
251 let items = exprs
252 .into_iter()
253 .map(|proj_expr| {
254 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255 self.select_item_to_sql(&unproj)
256 })
257 .collect::<Result<Vec<_>>>()?;
258
259 select.projection(items);
260 select.group_by(ast::GroupByExpr::Expressions(
261 agg.group_expr
262 .iter()
263 .map(|expr| self.expr_to_sql(expr))
264 .collect::<Result<Vec<_>>>()?,
265 vec![],
266 ));
267 }
268 (None, Some(window)) => {
269 let items = exprs
270 .into_iter()
271 .map(|proj_expr| {
272 let unproj = unproject_window_exprs(proj_expr, &window)?;
273 self.select_item_to_sql(&unproj)
274 })
275 .collect::<Result<Vec<_>>>()?;
276
277 select.projection(items);
278 }
279 _ => {
280 let items = exprs
281 .iter()
282 .map(|e| self.select_item_to_sql(e))
283 .collect::<Result<Vec<_>>>()?;
284 select.projection(items);
285 }
286 }
287 Ok(())
288 }
289
290 fn derive(
291 &self,
292 plan: &LogicalPlan,
293 relation: &mut RelationBuilder,
294 alias: Option<ast::TableAlias>,
295 lateral: bool,
296 ) -> Result<()> {
297 let mut derived_builder = DerivedRelationBuilder::default();
298 derived_builder.lateral(lateral).alias(alias).subquery({
299 let inner_statement = self.plan_to_sql(plan)?;
300 if let ast::Statement::Query(inner_query) = inner_statement {
301 inner_query
302 } else {
303 return internal_err!(
304 "Subquery must be a Query, but found {inner_statement:?}"
305 );
306 }
307 });
308 relation.derived(derived_builder);
309
310 Ok(())
311 }
312
313 fn derive_with_dialect_alias(
314 &self,
315 alias: &str,
316 plan: &LogicalPlan,
317 relation: &mut RelationBuilder,
318 lateral: bool,
319 columns: Vec<Ident>,
320 ) -> Result<()> {
321 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322 self.derive(
323 plan,
324 relation,
325 Some(self.new_table_alias(alias.to_string(), columns)),
326 lateral,
327 )
328 } else {
329 self.derive(plan, relation, None, lateral)
330 }
331 }
332
333 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334 fn select_to_sql_recursively(
335 &self,
336 plan: &LogicalPlan,
337 query: &mut Option<QueryBuilder>,
338 select: &mut SelectBuilder,
339 relation: &mut RelationBuilder,
340 ) -> Result<()> {
341 match plan {
342 LogicalPlan::TableScan(scan) => {
343 if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
344 plan,
345 None,
346 select.already_projected(),
347 )? {
348 return self.select_to_sql_recursively(
349 &unparsed_table_scan,
350 query,
351 select,
352 relation,
353 );
354 }
355 let mut builder = TableRelationBuilder::default();
356 let mut table_parts = vec![];
357 if let Some(catalog_name) = scan.table_name.catalog() {
358 table_parts
359 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360 }
361 if let Some(schema_name) = scan.table_name.schema() {
362 table_parts
363 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364 }
365 table_parts.push(
366 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367 );
368 builder.name(ast::ObjectName::from(table_parts));
369 relation.table(builder);
370
371 Ok(())
372 }
373 LogicalPlan::Projection(p) => {
374 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375 return self
376 .select_to_sql_recursively(&new_plan, query, select, relation);
377 }
378
379 let unnest_input_type = if p.expr.len() == 1 {
383 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384 } else {
385 None
386 };
387 if self.dialect.unnest_as_table_factor()
388 && unnest_input_type.is_some()
389 && let LogicalPlan::Unnest(unnest) = &p.input.as_ref()
390 && let Some(unnest_relation) =
391 self.try_unnest_to_table_factor_sql(unnest)?
392 {
393 relation.unnest(unnest_relation);
394 return self.select_to_sql_recursively(
395 p.input.as_ref(),
396 query,
397 select,
398 relation,
399 );
400 }
401
402 let columns = if unnest_input_type.is_some() {
405 p.expr
406 .iter()
407 .map(|e| {
408 self.new_ident_quoted_if_needs(e.schema_name().to_string())
409 })
410 .collect()
411 } else {
412 vec![]
413 };
414 if select.already_projected() {
416 return self.derive_with_dialect_alias(
417 "derived_projection",
418 plan,
419 relation,
420 unnest_input_type
421 .filter(|t| matches!(t, UnnestInputType::OuterReference))
422 .is_some(),
423 columns,
424 );
425 }
426 self.reconstruct_select_statement(plan, p, select)?;
427 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
428 }
429 LogicalPlan::Filter(filter) => {
430 if let Some(agg) =
431 find_agg_node_within_select(plan, select.already_projected())
432 {
433 let unprojected =
434 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
435 let filter_expr = self.expr_to_sql(&unprojected)?;
436 select.having(Some(filter_expr));
437 } else if let (Some(window), true) = (
438 find_window_nodes_within_select(
439 plan,
440 None,
441 select.already_projected(),
442 ),
443 self.dialect.supports_qualify(),
444 ) {
445 let unprojected =
446 unproject_window_exprs(filter.predicate.clone(), &window)?;
447 let filter_expr = self.expr_to_sql(&unprojected)?;
448 select.qualify(Some(filter_expr));
449 } else {
450 let filter_expr = self.expr_to_sql(&filter.predicate)?;
451 select.selection(Some(filter_expr));
452 }
453
454 self.select_to_sql_recursively(
455 filter.input.as_ref(),
456 query,
457 select,
458 relation,
459 )
460 }
461 LogicalPlan::Limit(limit) => {
462 if select.already_projected() {
464 return self.derive_with_dialect_alias(
465 "derived_limit",
466 plan,
467 relation,
468 false,
469 vec![],
470 );
471 }
472 if let Some(fetch) = &limit.fetch {
473 let Some(query) = query.as_mut() else {
474 return internal_err!(
475 "Limit operator only valid in a statement context."
476 );
477 };
478 query.limit(Some(self.expr_to_sql(fetch)?));
479 }
480
481 if let Some(skip) = &limit.skip {
482 let Some(query) = query.as_mut() else {
483 return internal_err!(
484 "Offset operator only valid in a statement context."
485 );
486 };
487
488 query.offset(Some(ast::Offset {
489 rows: ast::OffsetRows::None,
490 value: self.expr_to_sql(skip)?,
491 }));
492 }
493
494 self.select_to_sql_recursively(
495 limit.input.as_ref(),
496 query,
497 select,
498 relation,
499 )
500 }
501 LogicalPlan::Sort(sort) => {
502 let Some(query_ref) = query else {
503 return internal_err!(
504 "Sort operator only valid in a statement context."
505 );
506 };
507
508 if let Some(fetch) = sort.fetch {
509 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
510 fetch.to_string(),
511 false,
512 ))));
513 };
514
515 let agg = find_agg_node_within_select(plan, select.already_projected());
516 let sort_exprs: Vec<SortExpr> = sort
518 .expr
519 .iter()
520 .map(|sort_expr| {
521 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
522 })
523 .collect::<Result<Vec<_>>>()?;
524
525 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
526
527 self.select_to_sql_recursively(
528 sort.input.as_ref(),
529 query,
530 select,
531 relation,
532 )
533 }
534 LogicalPlan::Aggregate(agg) => {
535 if !select.already_projected() {
537 let exprs: Vec<_> = agg
540 .aggr_expr
541 .iter()
542 .chain(agg.group_expr.iter())
543 .map(|expr| self.select_item_to_sql(expr))
544 .collect::<Result<Vec<_>>>()?;
545 select.projection(exprs);
546
547 select.group_by(ast::GroupByExpr::Expressions(
548 agg.group_expr
549 .iter()
550 .map(|expr| self.expr_to_sql(expr))
551 .collect::<Result<Vec<_>>>()?,
552 vec![],
553 ));
554 }
555
556 self.select_to_sql_recursively(
557 agg.input.as_ref(),
558 query,
559 select,
560 relation,
561 )
562 }
563 LogicalPlan::Distinct(distinct) => {
564 if select.already_projected() {
566 return self.derive_with_dialect_alias(
567 "derived_distinct",
568 plan,
569 relation,
570 false,
571 vec![],
572 );
573 }
574
575 if let Distinct::All(input) = distinct
578 && matches!(input.as_ref(), LogicalPlan::Union(_))
579 && let Some(query_mut) = query.as_mut()
580 {
581 query_mut.distinct_union();
582 return self.select_to_sql_recursively(
583 input.as_ref(),
584 query,
585 select,
586 relation,
587 );
588 }
589
590 let (select_distinct, input) = match distinct {
591 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
592 Distinct::On(on) => {
593 let exprs = on
594 .on_expr
595 .iter()
596 .map(|e| self.expr_to_sql(e))
597 .collect::<Result<Vec<_>>>()?;
598 let items = on
599 .select_expr
600 .iter()
601 .map(|e| self.select_item_to_sql(e))
602 .collect::<Result<Vec<_>>>()?;
603 if let Some(sort_expr) = &on.sort_expr {
604 if let Some(query_ref) = query {
605 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
606 } else {
607 return internal_err!(
608 "Sort operator only valid in a statement context."
609 );
610 }
611 }
612 select.projection(items);
613 (ast::Distinct::On(exprs), on.input.as_ref())
614 }
615 };
616 select.distinct(Some(select_distinct));
617 self.select_to_sql_recursively(input, query, select, relation)
618 }
619 LogicalPlan::Join(join) => {
620 let mut table_scan_filters = vec![];
621 let (left_plan, right_plan) = match join.join_type {
622 JoinType::RightSemi | JoinType::RightAnti => {
623 (&join.right, &join.left)
624 }
625 _ => (&join.left, &join.right),
626 };
627 let already_projected = select.already_projected();
631
632 let left_plan =
633 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
634 Some((plan, filters)) => {
635 table_scan_filters.extend(filters);
636 Arc::new(plan)
637 }
638 None => Arc::clone(left_plan),
639 };
640
641 self.select_to_sql_recursively(
642 left_plan.as_ref(),
643 query,
644 select,
645 relation,
646 )?;
647
648 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
649 {
650 Some(select.pop_projections())
651 } else {
652 None
653 };
654
655 let right_plan =
656 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
657 Some((plan, filters)) => {
658 table_scan_filters.extend(filters);
659 Arc::new(plan)
660 }
661 None => Arc::clone(right_plan),
662 };
663
664 let mut right_relation = RelationBuilder::default();
665
666 self.select_to_sql_recursively(
667 right_plan.as_ref(),
668 query,
669 select,
670 &mut right_relation,
671 )?;
672
673 let join_filters = if table_scan_filters.is_empty() {
674 join.filter.clone()
675 } else {
676 let Some(combined_filters) =
678 table_scan_filters.into_iter().reduce(|acc, filter| {
679 Expr::BinaryExpr(BinaryExpr {
680 left: Box::new(acc),
681 op: Operator::And,
682 right: Box::new(filter),
683 })
684 })
685 else {
686 return internal_err!("Failed to combine TableScan filters");
687 };
688
689 match &join.filter {
691 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
692 left: Box::new(filter.clone()),
693 op: Operator::And,
694 right: Box::new(combined_filters),
695 })),
696 None => Some(combined_filters),
697 }
698 };
699
700 let join_constraint = self.join_constraint_to_sql(
701 join.join_constraint,
702 &join.on,
703 join_filters.as_ref(),
704 )?;
705
706 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
707 {
708 Some(select.pop_projections())
709 } else {
710 None
711 };
712
713 match join.join_type {
714 JoinType::LeftSemi
715 | JoinType::LeftAnti
716 | JoinType::LeftMark
717 | JoinType::RightSemi
718 | JoinType::RightAnti
719 | JoinType::RightMark => {
720 let mut query_builder = QueryBuilder::default();
721 let mut from = TableWithJoinsBuilder::default();
722 let mut exists_select: SelectBuilder = SelectBuilder::default();
723 from.relation(right_relation);
724 exists_select.push_from(from);
725 if let Some(filter) = &join.filter {
726 exists_select.selection(Some(self.expr_to_sql(filter)?));
727 }
728 for (left, right) in &join.on {
729 exists_select.selection(Some(
730 self.expr_to_sql(&left.clone().eq(right.clone()))?,
731 ));
732 }
733 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
734 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
735 )]);
736 query_builder.body(Box::new(SetExpr::Select(Box::new(
737 exists_select.build()?,
738 ))));
739
740 let negated = match join.join_type {
741 JoinType::LeftSemi
742 | JoinType::RightSemi
743 | JoinType::LeftMark
744 | JoinType::RightMark => false,
745 JoinType::LeftAnti | JoinType::RightAnti => true,
746 _ => unreachable!(),
747 };
748 let exists_expr = ast::Expr::Exists {
749 subquery: Box::new(query_builder.build()?),
750 negated,
751 };
752
753 match join.join_type {
754 JoinType::LeftMark | JoinType::RightMark => {
755 let source_schema =
756 if join.join_type == JoinType::LeftMark {
757 right_plan.schema()
758 } else {
759 left_plan.schema()
760 };
761 let (table_ref, _) = source_schema.qualified_field(0);
762 let column = self.col_to_sql(&Column::new(
763 table_ref.cloned(),
764 "mark",
765 ))?;
766 select.replace_mark(&column, &exists_expr);
767 }
768 _ => {
769 select.selection(Some(exists_expr));
770 }
771 }
772 if let Some(projection) = left_projection {
773 select.projection(projection);
774 }
775 }
776 JoinType::Inner
777 | JoinType::Left
778 | JoinType::Right
779 | JoinType::Full => {
780 let Ok(Some(relation)) = right_relation.build() else {
781 return internal_err!("Failed to build right relation");
782 };
783 let ast_join = ast::Join {
784 relation,
785 global: false,
786 join_operator: self
787 .join_operator_to_sql(join.join_type, join_constraint)?,
788 };
789 let mut from = select.pop_from().unwrap();
790 from.push_join(ast_join);
791 select.push_from(from);
792 if !already_projected {
793 let Some(left_projection) = left_projection else {
794 return internal_err!("Left projection is missing");
795 };
796
797 let Some(right_projection) = right_projection else {
798 return internal_err!("Right projection is missing");
799 };
800
801 let projection = left_projection
802 .into_iter()
803 .chain(right_projection)
804 .collect();
805 select.projection(projection);
806 }
807 }
808 };
809
810 Ok(())
811 }
812 LogicalPlan::SubqueryAlias(plan_alias) => {
813 let (plan, mut columns) =
814 subquery_alias_inner_query_and_columns(plan_alias);
815 let unparsed_table_scan = self.unparse_table_scan_pushdown(
816 plan,
817 Some(plan_alias.alias.clone()),
818 select.already_projected(),
819 )?;
820 if !select.already_projected() && unparsed_table_scan.is_none() {
823 select.projection(vec![ast::SelectItem::Wildcard(
824 ast::WildcardAdditionalOptions::default(),
825 )]);
826 }
827 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
828 if !columns.is_empty()
829 && !self.dialect.supports_column_alias_in_table_alias()
830 {
831 let rewritten_plan =
833 match inject_column_aliases_into_subquery(plan, columns) {
834 Ok(p) => p,
835 Err(e) => {
836 return internal_err!(
837 "Failed to transform SubqueryAlias plan: {e}"
838 );
839 }
840 };
841
842 columns = vec![];
843
844 self.select_to_sql_recursively(
845 &rewritten_plan,
846 query,
847 select,
848 relation,
849 )?;
850 } else {
851 self.select_to_sql_recursively(&plan, query, select, relation)?;
852 }
853
854 relation.alias(Some(
855 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
856 ));
857
858 Ok(())
859 }
860 LogicalPlan::Union(union) => {
861 if select.already_projected() {
863 return self.derive_with_dialect_alias(
864 "derived_union",
865 plan,
866 relation,
867 false,
868 vec![],
869 );
870 }
871
872 let input_exprs: Vec<SetExpr> = union
873 .inputs
874 .iter()
875 .map(|input| self.select_to_sql_expr(input, query))
876 .collect::<Result<Vec<_>>>()?;
877
878 assert_or_internal_err!(
879 input_exprs.len() >= 2,
880 "UNION operator requires at least 2 inputs"
881 );
882
883 let set_quantifier =
884 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
885 ast::SetQuantifier::None
888 } else {
889 ast::SetQuantifier::All
890 };
891
892 let union_expr = input_exprs
895 .into_iter()
896 .rev()
897 .reduce(|a, b| SetExpr::SetOperation {
898 op: ast::SetOperator::Union,
899 set_quantifier,
900 left: Box::new(b),
901 right: Box::new(a),
902 })
903 .unwrap();
904
905 let Some(query) = query.as_mut() else {
906 return internal_err!(
907 "UNION ALL operator only valid in a statement context"
908 );
909 };
910 query.body(Box::new(union_expr));
911
912 Ok(())
913 }
914 LogicalPlan::Window(window) => {
915 self.select_to_sql_recursively(
917 window.input.as_ref(),
918 query,
919 select,
920 relation,
921 )
922 }
923 LogicalPlan::EmptyRelation(_) => {
924 if !relation.has_relation() {
927 relation.empty();
928 }
929 Ok(())
930 }
931 LogicalPlan::Extension(extension) => {
932 if let Some(query) = query.as_mut() {
933 self.extension_to_sql(
934 extension.node.as_ref(),
935 &mut Some(query),
936 &mut Some(select),
937 &mut Some(relation),
938 )
939 } else {
940 self.extension_to_sql(
941 extension.node.as_ref(),
942 &mut None,
943 &mut Some(select),
944 &mut Some(relation),
945 )
946 }
947 }
948 LogicalPlan::Unnest(unnest) => {
949 assert_or_internal_err!(
950 unnest.struct_type_columns.is_empty(),
951 "Struct type columns are not currently supported in UNNEST: {:?}",
952 unnest.struct_type_columns
953 );
954
955 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
963 self.select_to_sql_recursively(&p.input, query, select, relation)
965 } else {
966 internal_err!("Unnest input is not a Projection: {unnest:?}")
967 }
968 }
969 LogicalPlan::Subquery(subquery)
970 if find_unnest_node_until_relation(subquery.subquery.as_ref())
971 .is_some() =>
972 {
973 if self.dialect.unnest_as_table_factor() {
974 self.select_to_sql_recursively(
975 subquery.subquery.as_ref(),
976 query,
977 select,
978 relation,
979 )
980 } else {
981 self.derive_with_dialect_alias(
982 "derived_unnest",
983 subquery.subquery.as_ref(),
984 relation,
985 true,
986 vec![],
987 )
988 }
989 }
990 _ => {
991 not_impl_err!("Unsupported operator: {plan:?}")
992 }
993 }
994 }
995
996 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1006 if let Expr::Alias(Alias { expr, .. }) = expr
1007 && let Expr::Column(Column { name, .. }) = expr.as_ref()
1008 && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1009 {
1010 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1011 return Some(UnnestInputType::OuterReference);
1012 }
1013 return Some(UnnestInputType::Scalar);
1014 }
1015 None
1016 }
1017
1018 fn try_unnest_to_table_factor_sql(
1019 &self,
1020 unnest: &Unnest,
1021 ) -> Result<Option<UnnestRelationBuilder>> {
1022 let mut unnest_relation = UnnestRelationBuilder::default();
1023 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1024 return Ok(None);
1025 };
1026
1027 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1028 return Ok(None);
1036 };
1037
1038 let exprs = projection
1039 .expr
1040 .iter()
1041 .map(|e| self.expr_to_sql(e))
1042 .collect::<Result<Vec<_>>>()?;
1043 unnest_relation.array_exprs(exprs);
1044
1045 Ok(Some(unnest_relation))
1046 }
1047
1048 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1049 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1050 }
1051
1052 fn unparse_table_scan_pushdown(
1055 &self,
1056 plan: &LogicalPlan,
1057 alias: Option<TableReference>,
1058 already_projected: bool,
1059 ) -> Result<Option<LogicalPlan>> {
1060 match plan {
1061 LogicalPlan::TableScan(table_scan) => {
1062 if !Self::is_scan_with_pushdown(table_scan) {
1063 return Ok(None);
1064 }
1065 let table_schema = table_scan.source.schema();
1066 let mut filter_alias_rewriter =
1067 alias.as_ref().map(|alias_name| TableAliasRewriter {
1068 table_schema: &table_schema,
1069 alias_name: alias_name.clone(),
1070 });
1071
1072 let mut builder = LogicalPlanBuilder::scan(
1073 table_scan.table_name.clone(),
1074 Arc::clone(&table_scan.source),
1075 None,
1076 )?;
1077 if let Some(ref alias) = alias
1083 && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1084 {
1085 builder = builder.alias(alias.clone())?;
1086 }
1087
1088 if !already_projected && let Some(project_vec) = &table_scan.projection {
1092 if project_vec.is_empty() {
1093 builder = builder.project(self.empty_projection_fallback())?;
1094 } else {
1095 let project_columns = project_vec
1096 .iter()
1097 .cloned()
1098 .map(|i| {
1099 let schema = table_scan.source.schema();
1100 let field = schema.field(i);
1101 if alias.is_some() {
1102 Column::new(alias.clone(), field.name().clone())
1103 } else {
1104 Column::new(
1105 Some(table_scan.table_name.clone()),
1106 field.name().clone(),
1107 )
1108 }
1109 })
1110 .collect::<Vec<_>>();
1111 builder = builder.project(project_columns)?;
1112 };
1113 }
1114
1115 let filter_expr: Result<Option<Expr>> = table_scan
1116 .filters
1117 .iter()
1118 .cloned()
1119 .map(|expr| {
1120 if let Some(ref mut rewriter) = filter_alias_rewriter {
1121 expr.rewrite(rewriter).data()
1122 } else {
1123 Ok(expr)
1124 }
1125 })
1126 .reduce(|acc, expr_result| {
1127 acc.and_then(|acc_expr| {
1128 expr_result.map(|expr| acc_expr.and(expr))
1129 })
1130 })
1131 .transpose();
1132
1133 if let Some(filter) = filter_expr? {
1134 builder = builder.filter(filter)?;
1135 }
1136
1137 if let Some(fetch) = table_scan.fetch {
1138 builder = builder.limit(0, Some(fetch))?;
1139 }
1140
1141 if let Some(alias) = alias
1146 && table_scan.projection.is_none()
1147 && table_scan.filters.is_empty()
1148 {
1149 builder = builder.alias(alias)?;
1150 }
1151
1152 Ok(Some(builder.build()?))
1153 }
1154 LogicalPlan::SubqueryAlias(subquery_alias) => {
1155 let ret = self.unparse_table_scan_pushdown(
1156 &subquery_alias.input,
1157 Some(subquery_alias.alias.clone()),
1158 already_projected,
1159 )?;
1160 if let Some(alias) = alias
1161 && let Some(plan) = ret
1162 {
1163 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1164 return Ok(Some(plan));
1165 }
1166 Ok(ret)
1167 }
1168 LogicalPlan::Projection(projection) => {
1171 if let Some(plan) = self.unparse_table_scan_pushdown(
1172 &projection.input,
1173 alias.clone(),
1174 already_projected,
1175 )? {
1176 let exprs = if alias.is_some() {
1177 let mut alias_rewriter =
1178 alias.as_ref().map(|alias_name| TableAliasRewriter {
1179 table_schema: plan.schema().as_arrow(),
1180 alias_name: alias_name.clone(),
1181 });
1182 projection
1183 .expr
1184 .iter()
1185 .cloned()
1186 .map(|expr| {
1187 if let Some(ref mut rewriter) = alias_rewriter {
1188 expr.rewrite(rewriter).data()
1189 } else {
1190 Ok(expr)
1191 }
1192 })
1193 .collect::<Result<Vec<_>>>()?
1194 } else {
1195 projection.expr.clone()
1196 };
1197 Ok(Some(
1198 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1199 ))
1200 } else {
1201 Ok(None)
1202 }
1203 }
1204 _ => Ok(None),
1205 }
1206 }
1207
1208 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1209 match expr {
1210 Expr::Alias(Alias { expr, name, .. }) => {
1211 let inner = self.expr_to_sql(expr)?;
1212
1213 let col_name = if let Some(rewritten_name) =
1215 self.dialect.col_alias_overrides(name)?
1216 {
1217 rewritten_name.to_string()
1218 } else {
1219 name.to_string()
1220 };
1221
1222 Ok(ast::SelectItem::ExprWithAlias {
1223 expr: inner,
1224 alias: self.new_ident_quoted_if_needs(col_name),
1225 })
1226 }
1227 _ => {
1228 let inner = self.expr_to_sql(expr)?;
1229
1230 Ok(ast::SelectItem::UnnamedExpr(inner))
1231 }
1232 }
1233 }
1234
1235 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1236 Ok(OrderByKind::Expressions(
1237 sort_exprs
1238 .iter()
1239 .map(|sort_expr| self.sort_to_sql(sort_expr))
1240 .collect::<Result<Vec<_>>>()?,
1241 ))
1242 }
1243
1244 fn join_operator_to_sql(
1245 &self,
1246 join_type: JoinType,
1247 constraint: ast::JoinConstraint,
1248 ) -> Result<ast::JoinOperator> {
1249 Ok(match join_type {
1250 JoinType::Inner => match &constraint {
1251 ast::JoinConstraint::On(_)
1252 | ast::JoinConstraint::Using(_)
1253 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1254 ast::JoinConstraint::None => {
1255 ast::JoinOperator::CrossJoin(constraint)
1258 }
1259 },
1260 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1261 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1262 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1263 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1264 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1265 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1266 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1267 JoinType::LeftMark | JoinType::RightMark => {
1268 unimplemented!("Unparsing of Mark join type")
1269 }
1270 })
1271 }
1272
1273 fn join_using_to_sql(
1277 &self,
1278 join_conditions: &[(Expr, Expr)],
1279 ) -> Option<ast::JoinConstraint> {
1280 let mut object_names = Vec::with_capacity(join_conditions.len());
1281 for (left, right) in join_conditions {
1282 match (left, right) {
1283 (
1284 Expr::Column(Column {
1285 relation: _,
1286 name: left_name,
1287 spans: _,
1288 }),
1289 Expr::Column(Column {
1290 relation: _,
1291 name: right_name,
1292 spans: _,
1293 }),
1294 ) if left_name == right_name => {
1295 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1299 object_names.push(ast::ObjectName::from(vec![ident]));
1300 }
1301 _ => return None,
1304 }
1305 }
1306 Some(ast::JoinConstraint::Using(object_names))
1307 }
1308
1309 fn join_constraint_to_sql(
1311 &self,
1312 constraint: JoinConstraint,
1313 conditions: &[(Expr, Expr)],
1314 filter: Option<&Expr>,
1315 ) -> Result<ast::JoinConstraint> {
1316 match (constraint, conditions, filter) {
1317 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1319 Ok(ast::JoinConstraint::None)
1320 }
1321
1322 (JoinConstraint::Using, conditions, None) => {
1323 match self.join_using_to_sql(conditions) {
1324 Some(using) => Ok(using),
1325 None => self.join_conditions_to_sql_on(conditions, None),
1328 }
1329 }
1330
1331 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1339 self.join_conditions_to_sql_on(conditions, filter)
1340 }
1341 }
1342 }
1343
1344 fn join_conditions_to_sql_on(
1348 &self,
1349 join_conditions: &[(Expr, Expr)],
1350 filter: Option<&Expr>,
1351 ) -> Result<ast::JoinConstraint> {
1352 let mut condition = None;
1353 for (left, right) in join_conditions {
1355 let l = self.expr_to_sql(left)?;
1357 let r = self.expr_to_sql(right)?;
1358 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1359 condition = match condition {
1360 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1361 None => Some(e),
1362 };
1363 }
1364
1365 condition = match (condition, filter) {
1367 (Some(expr), Some(filter)) => {
1368 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1369 }
1370 (Some(expr), None) => Some(expr),
1371 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1372 (None, None) => None,
1373 };
1374
1375 let constraint = match condition {
1376 Some(filter) => ast::JoinConstraint::On(filter),
1377 None => ast::JoinConstraint::None,
1378 };
1379
1380 Ok(constraint)
1381 }
1382
1383 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1384 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1385 }
1386
1387 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1388 let columns = columns
1389 .into_iter()
1390 .map(|ident| TableAliasColumnDef {
1391 name: ident,
1392 data_type: None,
1393 })
1394 .collect();
1395 ast::TableAlias {
1396 name: self.new_ident_quoted_if_needs(alias),
1397 columns,
1398 }
1399 }
1400
1401 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1402 not_impl_err!("Unsupported plan: {plan:?}")
1403 }
1404
1405 fn empty_projection_fallback(&self) -> Vec<Expr> {
1409 if self.dialect.supports_empty_select_list() {
1410 Vec::new()
1411 } else {
1412 vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
1413 }
1414 }
1415}
1416
1417impl From<BuilderError> for DataFusionError {
1418 fn from(e: BuilderError) -> Self {
1419 DataFusionError::External(Box::new(e))
1420 }
1421}
1422
1423#[derive(Debug)]
1425enum UnnestInputType {
1426 OuterReference,
1428 Scalar,
1430}