1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::extension_unparser::{
36 UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96 let unparser = Unparser::default();
97 unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102 let mut plan = normalize_union_schema(plan)?;
103 if !self.dialect.supports_qualify() {
104 plan = rewrite_qualify(plan)?;
105 }
106
107 match plan {
108 LogicalPlan::Projection(_)
109 | LogicalPlan::Filter(_)
110 | LogicalPlan::Window(_)
111 | LogicalPlan::Aggregate(_)
112 | LogicalPlan::Sort(_)
113 | LogicalPlan::Join(_)
114 | LogicalPlan::Repartition(_)
115 | LogicalPlan::Union(_)
116 | LogicalPlan::TableScan(_)
117 | LogicalPlan::EmptyRelation(_)
118 | LogicalPlan::Subquery(_)
119 | LogicalPlan::SubqueryAlias(_)
120 | LogicalPlan::Limit(_)
121 | LogicalPlan::Statement(_)
122 | LogicalPlan::Values(_)
123 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125 LogicalPlan::Extension(extension) => {
126 self.extension_to_statement(extension.node.as_ref())
127 }
128 LogicalPlan::Explain(_)
129 | LogicalPlan::Analyze(_)
130 | LogicalPlan::Ddl(_)
131 | LogicalPlan::Copy(_)
132 | LogicalPlan::DescribeTable(_)
133 | LogicalPlan::RecursiveQuery(_)
134 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135 }
136 }
137
138 fn extension_to_statement(
142 &self,
143 node: &dyn UserDefinedLogicalNode,
144 ) -> Result<ast::Statement> {
145 let mut statement = None;
146 for unparser in &self.extension_unparsers {
147 match unparser.unparse_to_statement(node, self)? {
148 UnparseToStatementResult::Modified(stmt) => {
149 statement = Some(stmt);
150 break;
151 }
152 UnparseToStatementResult::Unmodified => {}
153 }
154 }
155 if let Some(statement) = statement {
156 Ok(statement)
157 } else {
158 not_impl_err!("Unsupported extension node: {node:?}")
159 }
160 }
161
162 fn extension_to_sql(
166 &self,
167 node: &dyn UserDefinedLogicalNode,
168 query: &mut Option<&mut QueryBuilder>,
169 select: &mut Option<&mut SelectBuilder>,
170 relation: &mut Option<&mut RelationBuilder>,
171 ) -> Result<()> {
172 for unparser in &self.extension_unparsers {
173 match unparser.unparse(node, self, query, select, relation)? {
174 UnparseWithinStatementResult::Modified => return Ok(()),
175 UnparseWithinStatementResult::Unmodified => {}
176 }
177 }
178 not_impl_err!("Unsupported extension node: {node:?}")
179 }
180
181 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182 let mut query_builder = Some(QueryBuilder::default());
183
184 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186 let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188 Ok(ast::Statement::Query(Box::new(query)))
189 }
190
191 fn select_to_sql_expr(
192 &self,
193 plan: &LogicalPlan,
194 query: &mut Option<QueryBuilder>,
195 ) -> Result<SetExpr> {
196 let mut select_builder = SelectBuilder::default();
197 select_builder.push_from(TableWithJoinsBuilder::default());
198 let mut relation_builder = RelationBuilder::default();
199 self.select_to_sql_recursively(
200 plan,
201 query,
202 &mut select_builder,
203 &mut relation_builder,
204 )?;
205
206 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208 return Ok(*body);
209 }
210
211 if !select_builder.already_projected() {
214 select_builder.projection(vec![ast::SelectItem::Wildcard(
215 ast::WildcardAdditionalOptions::default(),
216 )]);
217 }
218
219 let mut twj = select_builder.pop_from().unwrap();
220 twj.relation(relation_builder);
221 select_builder.push_from(twj);
222
223 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224 }
225
226 fn reconstruct_select_statement(
230 &self,
231 plan: &LogicalPlan,
232 p: &Projection,
233 select: &mut SelectBuilder,
234 ) -> Result<()> {
235 let mut exprs = p.expr.clone();
236
237 if let Some(unnest) = find_unnest_node_within_select(plan) {
239 exprs = exprs
240 .into_iter()
241 .map(|e| unproject_unnest_expr(e, unnest))
242 .collect::<Result<Vec<_>>>()?;
243 };
244
245 match (
246 find_agg_node_within_select(plan, true),
247 find_window_nodes_within_select(plan, None, true),
248 ) {
249 (Some(agg), window) => {
250 let window_option = window.as_deref();
251 let items = exprs
252 .into_iter()
253 .map(|proj_expr| {
254 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255 self.select_item_to_sql(&unproj)
256 })
257 .collect::<Result<Vec<_>>>()?;
258
259 select.projection(items);
260 select.group_by(ast::GroupByExpr::Expressions(
261 agg.group_expr
262 .iter()
263 .map(|expr| self.expr_to_sql(expr))
264 .collect::<Result<Vec<_>>>()?,
265 vec![],
266 ));
267 }
268 (None, Some(window)) => {
269 let items = exprs
270 .into_iter()
271 .map(|proj_expr| {
272 let unproj = unproject_window_exprs(proj_expr, &window)?;
273 self.select_item_to_sql(&unproj)
274 })
275 .collect::<Result<Vec<_>>>()?;
276
277 select.projection(items);
278 }
279 _ => {
280 let items = exprs
281 .iter()
282 .map(|e| self.select_item_to_sql(e))
283 .collect::<Result<Vec<_>>>()?;
284 select.projection(items);
285 }
286 }
287 Ok(())
288 }
289
290 fn derive(
291 &self,
292 plan: &LogicalPlan,
293 relation: &mut RelationBuilder,
294 alias: Option<ast::TableAlias>,
295 lateral: bool,
296 ) -> Result<()> {
297 let mut derived_builder = DerivedRelationBuilder::default();
298 derived_builder.lateral(lateral).alias(alias).subquery({
299 let inner_statement = self.plan_to_sql(plan)?;
300 if let ast::Statement::Query(inner_query) = inner_statement {
301 inner_query
302 } else {
303 return internal_err!(
304 "Subquery must be a Query, but found {inner_statement:?}"
305 );
306 }
307 });
308 relation.derived(derived_builder);
309
310 Ok(())
311 }
312
313 fn derive_with_dialect_alias(
314 &self,
315 alias: &str,
316 plan: &LogicalPlan,
317 relation: &mut RelationBuilder,
318 lateral: bool,
319 columns: Vec<Ident>,
320 ) -> Result<()> {
321 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322 self.derive(
323 plan,
324 relation,
325 Some(self.new_table_alias(alias.to_string(), columns)),
326 lateral,
327 )
328 } else {
329 self.derive(plan, relation, None, lateral)
330 }
331 }
332
333 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334 fn select_to_sql_recursively(
335 &self,
336 plan: &LogicalPlan,
337 query: &mut Option<QueryBuilder>,
338 select: &mut SelectBuilder,
339 relation: &mut RelationBuilder,
340 ) -> Result<()> {
341 match plan {
342 LogicalPlan::TableScan(scan) => {
343 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
344 plan,
345 None,
346 select.already_projected(),
347 )? {
348 return self.select_to_sql_recursively(
349 &unparsed_table_scan,
350 query,
351 select,
352 relation,
353 );
354 }
355 let mut builder = TableRelationBuilder::default();
356 let mut table_parts = vec![];
357 if let Some(catalog_name) = scan.table_name.catalog() {
358 table_parts
359 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360 }
361 if let Some(schema_name) = scan.table_name.schema() {
362 table_parts
363 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364 }
365 table_parts.push(
366 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367 );
368 builder.name(ast::ObjectName::from(table_parts));
369 relation.table(builder);
370
371 Ok(())
372 }
373 LogicalPlan::Projection(p) => {
374 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375 return self
376 .select_to_sql_recursively(&new_plan, query, select, relation);
377 }
378
379 let unnest_input_type = if p.expr.len() == 1 {
383 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384 } else {
385 None
386 };
387 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
388 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
389 if let Some(unnest_relation) =
390 self.try_unnest_to_table_factor_sql(unnest)?
391 {
392 relation.unnest(unnest_relation);
393 return self.select_to_sql_recursively(
394 p.input.as_ref(),
395 query,
396 select,
397 relation,
398 );
399 }
400 }
401 }
402
403 let columns = if unnest_input_type.is_some() {
406 p.expr
407 .iter()
408 .map(|e| {
409 self.new_ident_quoted_if_needs(e.schema_name().to_string())
410 })
411 .collect()
412 } else {
413 vec![]
414 };
415 if select.already_projected() {
417 return self.derive_with_dialect_alias(
418 "derived_projection",
419 plan,
420 relation,
421 unnest_input_type
422 .filter(|t| matches!(t, UnnestInputType::OuterReference))
423 .is_some(),
424 columns,
425 );
426 }
427 self.reconstruct_select_statement(plan, p, select)?;
428 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
429 }
430 LogicalPlan::Filter(filter) => {
431 if let Some(agg) =
432 find_agg_node_within_select(plan, select.already_projected())
433 {
434 let unprojected =
435 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
436 let filter_expr = self.expr_to_sql(&unprojected)?;
437 select.having(Some(filter_expr));
438 } else if let (Some(window), true) = (
439 find_window_nodes_within_select(
440 plan,
441 None,
442 select.already_projected(),
443 ),
444 self.dialect.supports_qualify(),
445 ) {
446 let unprojected =
447 unproject_window_exprs(filter.predicate.clone(), &window)?;
448 let filter_expr = self.expr_to_sql(&unprojected)?;
449 select.qualify(Some(filter_expr));
450 } else {
451 let filter_expr = self.expr_to_sql(&filter.predicate)?;
452 select.selection(Some(filter_expr));
453 }
454
455 self.select_to_sql_recursively(
456 filter.input.as_ref(),
457 query,
458 select,
459 relation,
460 )
461 }
462 LogicalPlan::Limit(limit) => {
463 if select.already_projected() {
465 return self.derive_with_dialect_alias(
466 "derived_limit",
467 plan,
468 relation,
469 false,
470 vec![],
471 );
472 }
473 if let Some(fetch) = &limit.fetch {
474 let Some(query) = query.as_mut() else {
475 return internal_err!(
476 "Limit operator only valid in a statement context."
477 );
478 };
479 query.limit(Some(self.expr_to_sql(fetch)?));
480 }
481
482 if let Some(skip) = &limit.skip {
483 let Some(query) = query.as_mut() else {
484 return internal_err!(
485 "Offset operator only valid in a statement context."
486 );
487 };
488
489 query.offset(Some(ast::Offset {
490 rows: ast::OffsetRows::None,
491 value: self.expr_to_sql(skip)?,
492 }));
493 }
494
495 self.select_to_sql_recursively(
496 limit.input.as_ref(),
497 query,
498 select,
499 relation,
500 )
501 }
502 LogicalPlan::Sort(sort) => {
503 if select.already_projected() {
505 return self.derive_with_dialect_alias(
506 "derived_sort",
507 plan,
508 relation,
509 false,
510 vec![],
511 );
512 }
513 let Some(query_ref) = query else {
514 return internal_err!(
515 "Sort operator only valid in a statement context."
516 );
517 };
518
519 if let Some(fetch) = sort.fetch {
520 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
521 fetch.to_string(),
522 false,
523 ))));
524 };
525
526 let agg = find_agg_node_within_select(plan, select.already_projected());
527 let sort_exprs: Vec<SortExpr> = sort
529 .expr
530 .iter()
531 .map(|sort_expr| {
532 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
533 })
534 .collect::<Result<Vec<_>>>()?;
535
536 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
537
538 self.select_to_sql_recursively(
539 sort.input.as_ref(),
540 query,
541 select,
542 relation,
543 )
544 }
545 LogicalPlan::Aggregate(agg) => {
546 if !select.already_projected() {
548 let exprs: Vec<_> = agg
551 .aggr_expr
552 .iter()
553 .chain(agg.group_expr.iter())
554 .map(|expr| self.select_item_to_sql(expr))
555 .collect::<Result<Vec<_>>>()?;
556 select.projection(exprs);
557
558 select.group_by(ast::GroupByExpr::Expressions(
559 agg.group_expr
560 .iter()
561 .map(|expr| self.expr_to_sql(expr))
562 .collect::<Result<Vec<_>>>()?,
563 vec![],
564 ));
565 }
566
567 self.select_to_sql_recursively(
568 agg.input.as_ref(),
569 query,
570 select,
571 relation,
572 )
573 }
574 LogicalPlan::Distinct(distinct) => {
575 if select.already_projected() {
577 return self.derive_with_dialect_alias(
578 "derived_distinct",
579 plan,
580 relation,
581 false,
582 vec![],
583 );
584 }
585
586 if let Distinct::All(input) = distinct {
589 if matches!(input.as_ref(), LogicalPlan::Union(_)) {
590 if let Some(query_mut) = query.as_mut() {
591 query_mut.distinct_union();
592 return self.select_to_sql_recursively(
593 input.as_ref(),
594 query,
595 select,
596 relation,
597 );
598 }
599 }
600 }
601
602 let (select_distinct, input) = match distinct {
603 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
604 Distinct::On(on) => {
605 let exprs = on
606 .on_expr
607 .iter()
608 .map(|e| self.expr_to_sql(e))
609 .collect::<Result<Vec<_>>>()?;
610 let items = on
611 .select_expr
612 .iter()
613 .map(|e| self.select_item_to_sql(e))
614 .collect::<Result<Vec<_>>>()?;
615 if let Some(sort_expr) = &on.sort_expr {
616 if let Some(query_ref) = query {
617 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
618 } else {
619 return internal_err!(
620 "Sort operator only valid in a statement context."
621 );
622 }
623 }
624 select.projection(items);
625 (ast::Distinct::On(exprs), on.input.as_ref())
626 }
627 };
628 select.distinct(Some(select_distinct));
629 self.select_to_sql_recursively(input, query, select, relation)
630 }
631 LogicalPlan::Join(join) => {
632 let mut table_scan_filters = vec![];
633 let (left_plan, right_plan) = match join.join_type {
634 JoinType::RightSemi | JoinType::RightAnti => {
635 (&join.right, &join.left)
636 }
637 _ => (&join.left, &join.right),
638 };
639 let already_projected = select.already_projected();
643
644 let left_plan =
645 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
646 Some((plan, filters)) => {
647 table_scan_filters.extend(filters);
648 Arc::new(plan)
649 }
650 None => Arc::clone(left_plan),
651 };
652
653 self.select_to_sql_recursively(
654 left_plan.as_ref(),
655 query,
656 select,
657 relation,
658 )?;
659
660 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
661 {
662 Some(select.pop_projections())
663 } else {
664 None
665 };
666
667 let right_plan =
668 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
669 Some((plan, filters)) => {
670 table_scan_filters.extend(filters);
671 Arc::new(plan)
672 }
673 None => Arc::clone(right_plan),
674 };
675
676 let mut right_relation = RelationBuilder::default();
677
678 self.select_to_sql_recursively(
679 right_plan.as_ref(),
680 query,
681 select,
682 &mut right_relation,
683 )?;
684
685 let join_filters = if table_scan_filters.is_empty() {
686 join.filter.clone()
687 } else {
688 let Some(combined_filters) =
690 table_scan_filters.into_iter().reduce(|acc, filter| {
691 Expr::BinaryExpr(BinaryExpr {
692 left: Box::new(acc),
693 op: Operator::And,
694 right: Box::new(filter),
695 })
696 })
697 else {
698 return internal_err!("Failed to combine TableScan filters");
699 };
700
701 match &join.filter {
703 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
704 left: Box::new(filter.clone()),
705 op: Operator::And,
706 right: Box::new(combined_filters),
707 })),
708 None => Some(combined_filters),
709 }
710 };
711
712 let join_constraint = self.join_constraint_to_sql(
713 join.join_constraint,
714 &join.on,
715 join_filters.as_ref(),
716 )?;
717
718 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
719 {
720 Some(select.pop_projections())
721 } else {
722 None
723 };
724
725 match join.join_type {
726 JoinType::LeftSemi
727 | JoinType::LeftAnti
728 | JoinType::LeftMark
729 | JoinType::RightSemi
730 | JoinType::RightAnti
731 | JoinType::RightMark => {
732 let mut query_builder = QueryBuilder::default();
733 let mut from = TableWithJoinsBuilder::default();
734 let mut exists_select: SelectBuilder = SelectBuilder::default();
735 from.relation(right_relation);
736 exists_select.push_from(from);
737 if let Some(filter) = &join.filter {
738 exists_select.selection(Some(self.expr_to_sql(filter)?));
739 }
740 for (left, right) in &join.on {
741 exists_select.selection(Some(
742 self.expr_to_sql(&left.clone().eq(right.clone()))?,
743 ));
744 }
745 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
746 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
747 )]);
748 query_builder.body(Box::new(SetExpr::Select(Box::new(
749 exists_select.build()?,
750 ))));
751
752 let negated = match join.join_type {
753 JoinType::LeftSemi
754 | JoinType::RightSemi
755 | JoinType::LeftMark
756 | JoinType::RightMark => false,
757 JoinType::LeftAnti | JoinType::RightAnti => true,
758 _ => unreachable!(),
759 };
760 let exists_expr = ast::Expr::Exists {
761 subquery: Box::new(query_builder.build()?),
762 negated,
763 };
764
765 match join.join_type {
766 JoinType::LeftMark | JoinType::RightMark => {
767 let source_schema =
768 if join.join_type == JoinType::LeftMark {
769 right_plan.schema()
770 } else {
771 left_plan.schema()
772 };
773 let (table_ref, _) = source_schema.qualified_field(0);
774 let column = self.col_to_sql(&Column::new(
775 table_ref.cloned(),
776 "mark",
777 ))?;
778 select.replace_mark(&column, &exists_expr);
779 }
780 _ => {
781 select.selection(Some(exists_expr));
782 }
783 }
784 if let Some(projection) = left_projection {
785 select.projection(projection);
786 }
787 }
788 JoinType::Inner
789 | JoinType::Left
790 | JoinType::Right
791 | JoinType::Full => {
792 let Ok(Some(relation)) = right_relation.build() else {
793 return internal_err!("Failed to build right relation");
794 };
795 let ast_join = ast::Join {
796 relation,
797 global: false,
798 join_operator: self
799 .join_operator_to_sql(join.join_type, join_constraint)?,
800 };
801 let mut from = select.pop_from().unwrap();
802 from.push_join(ast_join);
803 select.push_from(from);
804 if !already_projected {
805 let Some(left_projection) = left_projection else {
806 return internal_err!("Left projection is missing");
807 };
808
809 let Some(right_projection) = right_projection else {
810 return internal_err!("Right projection is missing");
811 };
812
813 let projection = left_projection
814 .into_iter()
815 .chain(right_projection)
816 .collect();
817 select.projection(projection);
818 }
819 }
820 };
821
822 Ok(())
823 }
824 LogicalPlan::SubqueryAlias(plan_alias) => {
825 let (plan, mut columns) =
826 subquery_alias_inner_query_and_columns(plan_alias);
827 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
828 plan,
829 Some(plan_alias.alias.clone()),
830 select.already_projected(),
831 )?;
832 if !select.already_projected() && unparsed_table_scan.is_none() {
835 select.projection(vec![ast::SelectItem::Wildcard(
836 ast::WildcardAdditionalOptions::default(),
837 )]);
838 }
839 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
840 if !columns.is_empty()
841 && !self.dialect.supports_column_alias_in_table_alias()
842 {
843 let rewritten_plan =
845 match inject_column_aliases_into_subquery(plan, columns) {
846 Ok(p) => p,
847 Err(e) => {
848 return internal_err!(
849 "Failed to transform SubqueryAlias plan: {e}"
850 )
851 }
852 };
853
854 columns = vec![];
855
856 self.select_to_sql_recursively(
857 &rewritten_plan,
858 query,
859 select,
860 relation,
861 )?;
862 } else {
863 self.select_to_sql_recursively(&plan, query, select, relation)?;
864 }
865
866 relation.alias(Some(
867 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
868 ));
869
870 Ok(())
871 }
872 LogicalPlan::Union(union) => {
873 if select.already_projected() {
875 return self.derive_with_dialect_alias(
876 "derived_union",
877 plan,
878 relation,
879 false,
880 vec![],
881 );
882 }
883
884 let input_exprs: Vec<SetExpr> = union
885 .inputs
886 .iter()
887 .map(|input| self.select_to_sql_expr(input, query))
888 .collect::<Result<Vec<_>>>()?;
889
890 if input_exprs.len() < 2 {
891 return internal_err!("UNION operator requires at least 2 inputs");
892 }
893
894 let set_quantifier =
895 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
896 ast::SetQuantifier::None
899 } else {
900 ast::SetQuantifier::All
901 };
902
903 let union_expr = input_exprs
906 .into_iter()
907 .rev()
908 .reduce(|a, b| SetExpr::SetOperation {
909 op: ast::SetOperator::Union,
910 set_quantifier,
911 left: Box::new(b),
912 right: Box::new(a),
913 })
914 .unwrap();
915
916 let Some(query) = query.as_mut() else {
917 return internal_err!(
918 "UNION ALL operator only valid in a statement context"
919 );
920 };
921 query.body(Box::new(union_expr));
922
923 Ok(())
924 }
925 LogicalPlan::Window(window) => {
926 self.select_to_sql_recursively(
928 window.input.as_ref(),
929 query,
930 select,
931 relation,
932 )
933 }
934 LogicalPlan::EmptyRelation(_) => {
935 if !relation.has_relation() {
938 relation.empty();
939 }
940 Ok(())
941 }
942 LogicalPlan::Extension(extension) => {
943 if let Some(query) = query.as_mut() {
944 self.extension_to_sql(
945 extension.node.as_ref(),
946 &mut Some(query),
947 &mut Some(select),
948 &mut Some(relation),
949 )
950 } else {
951 self.extension_to_sql(
952 extension.node.as_ref(),
953 &mut None,
954 &mut Some(select),
955 &mut Some(relation),
956 )
957 }
958 }
959 LogicalPlan::Unnest(unnest) => {
960 if !unnest.struct_type_columns.is_empty() {
961 return internal_err!(
962 "Struct type columns are not currently supported in UNNEST: {:?}",
963 unnest.struct_type_columns
964 );
965 }
966
967 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
975 self.select_to_sql_recursively(&p.input, query, select, relation)
977 } else {
978 internal_err!("Unnest input is not a Projection: {unnest:?}")
979 }
980 }
981 LogicalPlan::Subquery(subquery)
982 if find_unnest_node_until_relation(subquery.subquery.as_ref())
983 .is_some() =>
984 {
985 if self.dialect.unnest_as_table_factor() {
986 self.select_to_sql_recursively(
987 subquery.subquery.as_ref(),
988 query,
989 select,
990 relation,
991 )
992 } else {
993 self.derive_with_dialect_alias(
994 "derived_unnest",
995 subquery.subquery.as_ref(),
996 relation,
997 true,
998 vec![],
999 )
1000 }
1001 }
1002 _ => {
1003 not_impl_err!("Unsupported operator: {plan:?}")
1004 }
1005 }
1006 }
1007
1008 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1018 if let Expr::Alias(Alias { expr, .. }) = expr {
1019 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1020 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1021 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1022 return Some(UnnestInputType::OuterReference);
1023 }
1024 return Some(UnnestInputType::Scalar);
1025 }
1026 }
1027 }
1028 None
1029 }
1030
1031 fn try_unnest_to_table_factor_sql(
1032 &self,
1033 unnest: &Unnest,
1034 ) -> Result<Option<UnnestRelationBuilder>> {
1035 let mut unnest_relation = UnnestRelationBuilder::default();
1036 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1037 return Ok(None);
1038 };
1039
1040 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1041 return Ok(None);
1049 };
1050
1051 let exprs = projection
1052 .expr
1053 .iter()
1054 .map(|e| self.expr_to_sql(e))
1055 .collect::<Result<Vec<_>>>()?;
1056 unnest_relation.array_exprs(exprs);
1057
1058 Ok(Some(unnest_relation))
1059 }
1060
1061 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1062 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1063 }
1064
1065 fn unparse_table_scan_pushdown(
1068 plan: &LogicalPlan,
1069 alias: Option<TableReference>,
1070 already_projected: bool,
1071 ) -> Result<Option<LogicalPlan>> {
1072 match plan {
1073 LogicalPlan::TableScan(table_scan) => {
1074 if !Self::is_scan_with_pushdown(table_scan) {
1075 return Ok(None);
1076 }
1077 let table_schema = table_scan.source.schema();
1078 let mut filter_alias_rewriter =
1079 alias.as_ref().map(|alias_name| TableAliasRewriter {
1080 table_schema: &table_schema,
1081 alias_name: alias_name.clone(),
1082 });
1083
1084 let mut builder = LogicalPlanBuilder::scan(
1085 table_scan.table_name.clone(),
1086 Arc::clone(&table_scan.source),
1087 None,
1088 )?;
1089 if let Some(ref alias) = alias {
1095 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1096 builder = builder.alias(alias.clone())?;
1097 }
1098 }
1099
1100 if !already_projected {
1104 if let Some(project_vec) = &table_scan.projection {
1105 if project_vec.is_empty() {
1106 builder = builder.project(vec![Expr::Literal(
1107 ScalarValue::Int64(Some(1)),
1108 None,
1109 )])?;
1110 } else {
1111 let project_columns = project_vec
1112 .iter()
1113 .cloned()
1114 .map(|i| {
1115 let schema = table_scan.source.schema();
1116 let field = schema.field(i);
1117 if alias.is_some() {
1118 Column::new(alias.clone(), field.name().clone())
1119 } else {
1120 Column::new(
1121 Some(table_scan.table_name.clone()),
1122 field.name().clone(),
1123 )
1124 }
1125 })
1126 .collect::<Vec<_>>();
1127 builder = builder.project(project_columns)?;
1128 };
1129 }
1130 }
1131
1132 let filter_expr: Result<Option<Expr>> = table_scan
1133 .filters
1134 .iter()
1135 .cloned()
1136 .map(|expr| {
1137 if let Some(ref mut rewriter) = filter_alias_rewriter {
1138 expr.rewrite(rewriter).data()
1139 } else {
1140 Ok(expr)
1141 }
1142 })
1143 .reduce(|acc, expr_result| {
1144 acc.and_then(|acc_expr| {
1145 expr_result.map(|expr| acc_expr.and(expr))
1146 })
1147 })
1148 .transpose();
1149
1150 if let Some(filter) = filter_expr? {
1151 builder = builder.filter(filter)?;
1152 }
1153
1154 if let Some(fetch) = table_scan.fetch {
1155 builder = builder.limit(0, Some(fetch))?;
1156 }
1157
1158 if let Some(alias) = alias {
1163 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1164 builder = builder.alias(alias)?;
1165 }
1166 }
1167
1168 Ok(Some(builder.build()?))
1169 }
1170 LogicalPlan::SubqueryAlias(subquery_alias) => {
1171 let ret = Self::unparse_table_scan_pushdown(
1172 &subquery_alias.input,
1173 Some(subquery_alias.alias.clone()),
1174 already_projected,
1175 )?;
1176 if let Some(alias) = alias {
1177 if let Some(plan) = ret {
1178 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1179 return Ok(Some(plan));
1180 }
1181 }
1182 Ok(ret)
1183 }
1184 LogicalPlan::Projection(projection) => {
1187 if let Some(plan) = Self::unparse_table_scan_pushdown(
1188 &projection.input,
1189 alias.clone(),
1190 already_projected,
1191 )? {
1192 let exprs = if alias.is_some() {
1193 let mut alias_rewriter =
1194 alias.as_ref().map(|alias_name| TableAliasRewriter {
1195 table_schema: plan.schema().as_arrow(),
1196 alias_name: alias_name.clone(),
1197 });
1198 projection
1199 .expr
1200 .iter()
1201 .cloned()
1202 .map(|expr| {
1203 if let Some(ref mut rewriter) = alias_rewriter {
1204 expr.rewrite(rewriter).data()
1205 } else {
1206 Ok(expr)
1207 }
1208 })
1209 .collect::<Result<Vec<_>>>()?
1210 } else {
1211 projection.expr.clone()
1212 };
1213 Ok(Some(
1214 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1215 ))
1216 } else {
1217 Ok(None)
1218 }
1219 }
1220 _ => Ok(None),
1221 }
1222 }
1223
1224 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1225 match expr {
1226 Expr::Alias(Alias { expr, name, .. }) => {
1227 let inner = self.expr_to_sql(expr)?;
1228
1229 let col_name = if let Some(rewritten_name) =
1231 self.dialect.col_alias_overrides(name)?
1232 {
1233 rewritten_name.to_string()
1234 } else {
1235 name.to_string()
1236 };
1237
1238 Ok(ast::SelectItem::ExprWithAlias {
1239 expr: inner,
1240 alias: self.new_ident_quoted_if_needs(col_name),
1241 })
1242 }
1243 _ => {
1244 let inner = self.expr_to_sql(expr)?;
1245
1246 Ok(ast::SelectItem::UnnamedExpr(inner))
1247 }
1248 }
1249 }
1250
1251 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1252 Ok(OrderByKind::Expressions(
1253 sort_exprs
1254 .iter()
1255 .map(|sort_expr| self.sort_to_sql(sort_expr))
1256 .collect::<Result<Vec<_>>>()?,
1257 ))
1258 }
1259
1260 fn join_operator_to_sql(
1261 &self,
1262 join_type: JoinType,
1263 constraint: ast::JoinConstraint,
1264 ) -> Result<ast::JoinOperator> {
1265 Ok(match join_type {
1266 JoinType::Inner => match &constraint {
1267 ast::JoinConstraint::On(_)
1268 | ast::JoinConstraint::Using(_)
1269 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1270 ast::JoinConstraint::None => {
1271 ast::JoinOperator::CrossJoin(constraint)
1274 }
1275 },
1276 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1277 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1278 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1279 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1280 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1281 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1282 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1283 JoinType::LeftMark | JoinType::RightMark => {
1284 unimplemented!("Unparsing of Mark join type")
1285 }
1286 })
1287 }
1288
1289 fn join_using_to_sql(
1293 &self,
1294 join_conditions: &[(Expr, Expr)],
1295 ) -> Option<ast::JoinConstraint> {
1296 let mut object_names = Vec::with_capacity(join_conditions.len());
1297 for (left, right) in join_conditions {
1298 match (left, right) {
1299 (
1300 Expr::Column(Column {
1301 relation: _,
1302 name: left_name,
1303 spans: _,
1304 }),
1305 Expr::Column(Column {
1306 relation: _,
1307 name: right_name,
1308 spans: _,
1309 }),
1310 ) if left_name == right_name => {
1311 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1315 object_names.push(ast::ObjectName::from(vec![ident]));
1316 }
1317 _ => return None,
1320 }
1321 }
1322 Some(ast::JoinConstraint::Using(object_names))
1323 }
1324
1325 fn join_constraint_to_sql(
1327 &self,
1328 constraint: JoinConstraint,
1329 conditions: &[(Expr, Expr)],
1330 filter: Option<&Expr>,
1331 ) -> Result<ast::JoinConstraint> {
1332 match (constraint, conditions, filter) {
1333 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1335 Ok(ast::JoinConstraint::None)
1336 }
1337
1338 (JoinConstraint::Using, conditions, None) => {
1339 match self.join_using_to_sql(conditions) {
1340 Some(using) => Ok(using),
1341 None => self.join_conditions_to_sql_on(conditions, None),
1344 }
1345 }
1346
1347 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1355 self.join_conditions_to_sql_on(conditions, filter)
1356 }
1357 }
1358 }
1359
1360 fn join_conditions_to_sql_on(
1364 &self,
1365 join_conditions: &[(Expr, Expr)],
1366 filter: Option<&Expr>,
1367 ) -> Result<ast::JoinConstraint> {
1368 let mut condition = None;
1369 for (left, right) in join_conditions {
1371 let l = self.expr_to_sql(left)?;
1373 let r = self.expr_to_sql(right)?;
1374 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1375 condition = match condition {
1376 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1377 None => Some(e),
1378 };
1379 }
1380
1381 condition = match (condition, filter) {
1383 (Some(expr), Some(filter)) => {
1384 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1385 }
1386 (Some(expr), None) => Some(expr),
1387 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1388 (None, None) => None,
1389 };
1390
1391 let constraint = match condition {
1392 Some(filter) => ast::JoinConstraint::On(filter),
1393 None => ast::JoinConstraint::None,
1394 };
1395
1396 Ok(constraint)
1397 }
1398
1399 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1400 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1401 }
1402
1403 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1404 let columns = columns
1405 .into_iter()
1406 .map(|ident| TableAliasColumnDef {
1407 name: ident,
1408 data_type: None,
1409 })
1410 .collect();
1411 ast::TableAlias {
1412 name: self.new_ident_quoted_if_needs(alias),
1413 columns,
1414 }
1415 }
1416
1417 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1418 not_impl_err!("Unsupported plan: {plan:?}")
1419 }
1420}
1421
1422impl From<BuilderError> for DataFusionError {
1423 fn from(e: BuilderError) -> Self {
1424 DataFusionError::External(Box::new(e))
1425 }
1426}
1427
1428#[derive(Debug)]
1430enum UnnestInputType {
1431 OuterReference,
1433 Scalar,
1435}