1use super::{
19 Unparser,
20 ast::{
21 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
22 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
23 },
24 rewrite::{
25 TableAliasRewriter, inject_column_aliases_into_subquery, normalize_union_schema,
26 rewrite_plan_for_sort_on_non_projected_fields,
27 subquery_alias_inner_query_and_columns,
28 },
29 utils::{
30 find_agg_node_within_select, find_unnest_node_within_select,
31 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
32 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
33 },
34};
35use crate::unparser::extension_unparser::{
36 UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err,
43 internal_err, not_impl_err,
44 tree_node::{TransformedResult, TreeNode},
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode, expr::Alias,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
96 let unparser = Unparser::default();
97 unparser.plan_to_sql(plan)
98}
99
100impl Unparser<'_> {
101 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
102 let mut plan = normalize_union_schema(plan)?;
103 if !self.dialect.supports_qualify() {
104 plan = rewrite_qualify(plan)?;
105 }
106
107 match plan {
108 LogicalPlan::Projection(_)
109 | LogicalPlan::Filter(_)
110 | LogicalPlan::Window(_)
111 | LogicalPlan::Aggregate(_)
112 | LogicalPlan::Sort(_)
113 | LogicalPlan::Join(_)
114 | LogicalPlan::Repartition(_)
115 | LogicalPlan::Union(_)
116 | LogicalPlan::TableScan(_)
117 | LogicalPlan::EmptyRelation(_)
118 | LogicalPlan::Subquery(_)
119 | LogicalPlan::SubqueryAlias(_)
120 | LogicalPlan::Limit(_)
121 | LogicalPlan::Statement(_)
122 | LogicalPlan::Values(_)
123 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
124 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
125 LogicalPlan::Extension(extension) => {
126 self.extension_to_statement(extension.node.as_ref())
127 }
128 LogicalPlan::Explain(_)
129 | LogicalPlan::Analyze(_)
130 | LogicalPlan::Ddl(_)
131 | LogicalPlan::Copy(_)
132 | LogicalPlan::DescribeTable(_)
133 | LogicalPlan::RecursiveQuery(_)
134 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
135 }
136 }
137
138 fn extension_to_statement(
142 &self,
143 node: &dyn UserDefinedLogicalNode,
144 ) -> Result<ast::Statement> {
145 let mut statement = None;
146 for unparser in &self.extension_unparsers {
147 match unparser.unparse_to_statement(node, self)? {
148 UnparseToStatementResult::Modified(stmt) => {
149 statement = Some(stmt);
150 break;
151 }
152 UnparseToStatementResult::Unmodified => {}
153 }
154 }
155 if let Some(statement) = statement {
156 Ok(statement)
157 } else {
158 not_impl_err!("Unsupported extension node: {node:?}")
159 }
160 }
161
162 fn extension_to_sql(
166 &self,
167 node: &dyn UserDefinedLogicalNode,
168 query: &mut Option<&mut QueryBuilder>,
169 select: &mut Option<&mut SelectBuilder>,
170 relation: &mut Option<&mut RelationBuilder>,
171 ) -> Result<()> {
172 for unparser in &self.extension_unparsers {
173 match unparser.unparse(node, self, query, select, relation)? {
174 UnparseWithinStatementResult::Modified => return Ok(()),
175 UnparseWithinStatementResult::Unmodified => {}
176 }
177 }
178 not_impl_err!("Unsupported extension node: {node:?}")
179 }
180
181 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
182 let mut query_builder = Some(QueryBuilder::default());
183
184 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
185
186 let query = query_builder.unwrap().body(Box::new(body)).build()?;
187
188 Ok(ast::Statement::Query(Box::new(query)))
189 }
190
191 fn select_to_sql_expr(
192 &self,
193 plan: &LogicalPlan,
194 query: &mut Option<QueryBuilder>,
195 ) -> Result<SetExpr> {
196 let mut select_builder = SelectBuilder::default();
197 select_builder.push_from(TableWithJoinsBuilder::default());
198 let mut relation_builder = RelationBuilder::default();
199 self.select_to_sql_recursively(
200 plan,
201 query,
202 &mut select_builder,
203 &mut relation_builder,
204 )?;
205
206 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
208 return Ok(*body);
209 }
210
211 if !select_builder.already_projected() {
214 select_builder.projection(vec![ast::SelectItem::Wildcard(
215 ast::WildcardAdditionalOptions::default(),
216 )]);
217 }
218
219 let mut twj = select_builder.pop_from().unwrap();
220 twj.relation(relation_builder);
221 select_builder.push_from(twj);
222
223 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
224 }
225
226 fn reconstruct_select_statement(
230 &self,
231 plan: &LogicalPlan,
232 p: &Projection,
233 select: &mut SelectBuilder,
234 ) -> Result<()> {
235 let mut exprs = p.expr.clone();
236
237 if let Some(unnest) = find_unnest_node_within_select(plan) {
239 exprs = exprs
240 .into_iter()
241 .map(|e| unproject_unnest_expr(e, unnest))
242 .collect::<Result<Vec<_>>>()?;
243 };
244
245 match (
246 find_agg_node_within_select(plan, true),
247 find_window_nodes_within_select(plan, None, true),
248 ) {
249 (Some(agg), window) => {
250 let window_option = window.as_deref();
251 let items = exprs
252 .into_iter()
253 .map(|proj_expr| {
254 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
255 self.select_item_to_sql(&unproj)
256 })
257 .collect::<Result<Vec<_>>>()?;
258
259 select.projection(items);
260 select.group_by(ast::GroupByExpr::Expressions(
261 agg.group_expr
262 .iter()
263 .map(|expr| self.expr_to_sql(expr))
264 .collect::<Result<Vec<_>>>()?,
265 vec![],
266 ));
267 }
268 (None, Some(window)) => {
269 let items = exprs
270 .into_iter()
271 .map(|proj_expr| {
272 let unproj = unproject_window_exprs(proj_expr, &window)?;
273 self.select_item_to_sql(&unproj)
274 })
275 .collect::<Result<Vec<_>>>()?;
276
277 select.projection(items);
278 }
279 _ => {
280 let items = exprs
281 .iter()
282 .map(|e| self.select_item_to_sql(e))
283 .collect::<Result<Vec<_>>>()?;
284 select.projection(items);
285 }
286 }
287 Ok(())
288 }
289
290 fn derive(
291 &self,
292 plan: &LogicalPlan,
293 relation: &mut RelationBuilder,
294 alias: Option<ast::TableAlias>,
295 lateral: bool,
296 ) -> Result<()> {
297 let mut derived_builder = DerivedRelationBuilder::default();
298 derived_builder.lateral(lateral).alias(alias).subquery({
299 let inner_statement = self.plan_to_sql(plan)?;
300 if let ast::Statement::Query(inner_query) = inner_statement {
301 inner_query
302 } else {
303 return internal_err!(
304 "Subquery must be a Query, but found {inner_statement:?}"
305 );
306 }
307 });
308 relation.derived(derived_builder);
309
310 Ok(())
311 }
312
313 fn derive_with_dialect_alias(
314 &self,
315 alias: &str,
316 plan: &LogicalPlan,
317 relation: &mut RelationBuilder,
318 lateral: bool,
319 columns: Vec<Ident>,
320 ) -> Result<()> {
321 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
322 self.derive(
323 plan,
324 relation,
325 Some(self.new_table_alias(alias.to_string(), columns)),
326 lateral,
327 )
328 } else {
329 self.derive(plan, relation, None, lateral)
330 }
331 }
332
333 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
334 fn select_to_sql_recursively(
335 &self,
336 plan: &LogicalPlan,
337 query: &mut Option<QueryBuilder>,
338 select: &mut SelectBuilder,
339 relation: &mut RelationBuilder,
340 ) -> Result<()> {
341 match plan {
342 LogicalPlan::TableScan(scan) => {
343 if let Some(unparsed_table_scan) = self.unparse_table_scan_pushdown(
344 plan,
345 None,
346 select.already_projected(),
347 )? {
348 return self.select_to_sql_recursively(
349 &unparsed_table_scan,
350 query,
351 select,
352 relation,
353 );
354 }
355 let mut builder = TableRelationBuilder::default();
356 let mut table_parts = vec![];
357 if let Some(catalog_name) = scan.table_name.catalog() {
358 table_parts
359 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
360 }
361 if let Some(schema_name) = scan.table_name.schema() {
362 table_parts
363 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
364 }
365 table_parts.push(
366 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
367 );
368 builder.name(ast::ObjectName::from(table_parts));
369 relation.table(builder);
370
371 Ok(())
372 }
373 LogicalPlan::Projection(p) => {
374 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
375 return self
376 .select_to_sql_recursively(&new_plan, query, select, relation);
377 }
378
379 let unnest_input_type = if p.expr.len() == 1 {
383 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
384 } else {
385 None
386 };
387 if self.dialect.unnest_as_table_factor()
388 && unnest_input_type.is_some()
389 && let LogicalPlan::Unnest(unnest) = &p.input.as_ref()
390 && let Some(unnest_relation) =
391 self.try_unnest_to_table_factor_sql(unnest)?
392 {
393 relation.unnest(unnest_relation);
394 return self.select_to_sql_recursively(
395 p.input.as_ref(),
396 query,
397 select,
398 relation,
399 );
400 }
401
402 let columns = if unnest_input_type.is_some() {
405 p.expr
406 .iter()
407 .map(|e| {
408 self.new_ident_quoted_if_needs(e.schema_name().to_string())
409 })
410 .collect()
411 } else {
412 vec![]
413 };
414 if select.already_projected() {
416 return self.derive_with_dialect_alias(
417 "derived_projection",
418 plan,
419 relation,
420 unnest_input_type
421 .filter(|t| matches!(t, UnnestInputType::OuterReference))
422 .is_some(),
423 columns,
424 );
425 }
426 self.reconstruct_select_statement(plan, p, select)?;
427 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
428 }
429 LogicalPlan::Filter(filter) => {
430 if let Some(agg) =
431 find_agg_node_within_select(plan, select.already_projected())
432 {
433 let unprojected =
434 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
435 let filter_expr = self.expr_to_sql(&unprojected)?;
436 select.having(Some(filter_expr));
437 } else if let (Some(window), true) = (
438 find_window_nodes_within_select(
439 plan,
440 None,
441 select.already_projected(),
442 ),
443 self.dialect.supports_qualify(),
444 ) {
445 let unprojected =
446 unproject_window_exprs(filter.predicate.clone(), &window)?;
447 let filter_expr = self.expr_to_sql(&unprojected)?;
448 select.qualify(Some(filter_expr));
449 } else {
450 let filter_expr = self.expr_to_sql(&filter.predicate)?;
451 select.selection(Some(filter_expr));
452 }
453
454 self.select_to_sql_recursively(
455 filter.input.as_ref(),
456 query,
457 select,
458 relation,
459 )
460 }
461 LogicalPlan::Limit(limit) => {
462 if select.already_projected() {
464 return self.derive_with_dialect_alias(
465 "derived_limit",
466 plan,
467 relation,
468 false,
469 vec![],
470 );
471 }
472 if let Some(fetch) = &limit.fetch {
473 let Some(query) = query.as_mut() else {
474 return internal_err!(
475 "Limit operator only valid in a statement context."
476 );
477 };
478 query.limit(Some(self.expr_to_sql(fetch)?));
479 }
480
481 if let Some(skip) = &limit.skip {
482 let Some(query) = query.as_mut() else {
483 return internal_err!(
484 "Offset operator only valid in a statement context."
485 );
486 };
487
488 query.offset(Some(ast::Offset {
489 rows: ast::OffsetRows::None,
490 value: self.expr_to_sql(skip)?,
491 }));
492 }
493
494 self.select_to_sql_recursively(
495 limit.input.as_ref(),
496 query,
497 select,
498 relation,
499 )
500 }
501 LogicalPlan::Sort(sort) => {
502 if select.already_projected() {
504 return self.derive_with_dialect_alias(
505 "derived_sort",
506 plan,
507 relation,
508 false,
509 vec![],
510 );
511 }
512
513 let Some(query_ref) = query else {
514 return internal_err!(
515 "Sort operator only valid in a statement context."
516 );
517 };
518
519 if let Some(fetch) = sort.fetch {
520 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
521 fetch.to_string(),
522 false,
523 ))));
524 };
525
526 let agg = find_agg_node_within_select(plan, select.already_projected());
527 let sort_exprs: Vec<SortExpr> = sort
529 .expr
530 .iter()
531 .map(|sort_expr| {
532 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
533 })
534 .collect::<Result<Vec<_>>>()?;
535
536 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
537
538 self.select_to_sql_recursively(
539 sort.input.as_ref(),
540 query,
541 select,
542 relation,
543 )
544 }
545 LogicalPlan::Aggregate(agg) => {
546 if !select.already_projected() {
548 let exprs: Vec<_> = agg
551 .aggr_expr
552 .iter()
553 .chain(agg.group_expr.iter())
554 .map(|expr| self.select_item_to_sql(expr))
555 .collect::<Result<Vec<_>>>()?;
556 select.projection(exprs);
557
558 select.group_by(ast::GroupByExpr::Expressions(
559 agg.group_expr
560 .iter()
561 .map(|expr| self.expr_to_sql(expr))
562 .collect::<Result<Vec<_>>>()?,
563 vec![],
564 ));
565 }
566
567 self.select_to_sql_recursively(
568 agg.input.as_ref(),
569 query,
570 select,
571 relation,
572 )
573 }
574 LogicalPlan::Distinct(distinct) => {
575 if select.already_projected() {
577 return self.derive_with_dialect_alias(
578 "derived_distinct",
579 plan,
580 relation,
581 false,
582 vec![],
583 );
584 }
585
586 if let Distinct::All(input) = distinct
589 && matches!(input.as_ref(), LogicalPlan::Union(_))
590 && let Some(query_mut) = query.as_mut()
591 {
592 query_mut.distinct_union();
593 return self.select_to_sql_recursively(
594 input.as_ref(),
595 query,
596 select,
597 relation,
598 );
599 }
600
601 let (select_distinct, input) = match distinct {
602 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
603 Distinct::On(on) => {
604 let exprs = on
605 .on_expr
606 .iter()
607 .map(|e| self.expr_to_sql(e))
608 .collect::<Result<Vec<_>>>()?;
609 let items = on
610 .select_expr
611 .iter()
612 .map(|e| self.select_item_to_sql(e))
613 .collect::<Result<Vec<_>>>()?;
614 if let Some(sort_expr) = &on.sort_expr {
615 if let Some(query_ref) = query {
616 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
617 } else {
618 return internal_err!(
619 "Sort operator only valid in a statement context."
620 );
621 }
622 }
623 select.projection(items);
624 (ast::Distinct::On(exprs), on.input.as_ref())
625 }
626 };
627 select.distinct(Some(select_distinct));
628 self.select_to_sql_recursively(input, query, select, relation)
629 }
630 LogicalPlan::Join(join) => {
631 let mut table_scan_filters = vec![];
632 let (left_plan, right_plan) = match join.join_type {
633 JoinType::RightSemi | JoinType::RightAnti => {
634 (&join.right, &join.left)
635 }
636 _ => (&join.left, &join.right),
637 };
638 let already_projected = select.already_projected();
642
643 let left_plan =
644 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
645 Some((plan, filters)) => {
646 table_scan_filters.extend(filters);
647 Arc::new(plan)
648 }
649 None => Arc::clone(left_plan),
650 };
651
652 self.select_to_sql_recursively(
653 left_plan.as_ref(),
654 query,
655 select,
656 relation,
657 )?;
658
659 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
660 {
661 Some(select.pop_projections())
662 } else {
663 None
664 };
665
666 let right_plan =
667 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
668 Some((plan, filters)) => {
669 table_scan_filters.extend(filters);
670 Arc::new(plan)
671 }
672 None => Arc::clone(right_plan),
673 };
674
675 let mut right_relation = RelationBuilder::default();
676
677 self.select_to_sql_recursively(
678 right_plan.as_ref(),
679 query,
680 select,
681 &mut right_relation,
682 )?;
683
684 let join_filters = if table_scan_filters.is_empty() {
685 join.filter.clone()
686 } else {
687 let Some(combined_filters) =
689 table_scan_filters.into_iter().reduce(|acc, filter| {
690 Expr::BinaryExpr(BinaryExpr {
691 left: Box::new(acc),
692 op: Operator::And,
693 right: Box::new(filter),
694 })
695 })
696 else {
697 return internal_err!("Failed to combine TableScan filters");
698 };
699
700 match &join.filter {
702 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
703 left: Box::new(filter.clone()),
704 op: Operator::And,
705 right: Box::new(combined_filters),
706 })),
707 None => Some(combined_filters),
708 }
709 };
710
711 let join_constraint = self.join_constraint_to_sql(
712 join.join_constraint,
713 &join.on,
714 join_filters.as_ref(),
715 )?;
716
717 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
718 {
719 Some(select.pop_projections())
720 } else {
721 None
722 };
723
724 match join.join_type {
725 JoinType::LeftSemi
726 | JoinType::LeftAnti
727 | JoinType::LeftMark
728 | JoinType::RightSemi
729 | JoinType::RightAnti
730 | JoinType::RightMark => {
731 let mut query_builder = QueryBuilder::default();
732 let mut from = TableWithJoinsBuilder::default();
733 let mut exists_select: SelectBuilder = SelectBuilder::default();
734 from.relation(right_relation);
735 exists_select.push_from(from);
736 if let Some(filter) = &join.filter {
737 exists_select.selection(Some(self.expr_to_sql(filter)?));
738 }
739 for (left, right) in &join.on {
740 exists_select.selection(Some(
741 self.expr_to_sql(&left.clone().eq(right.clone()))?,
742 ));
743 }
744 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
745 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
746 )]);
747 query_builder.body(Box::new(SetExpr::Select(Box::new(
748 exists_select.build()?,
749 ))));
750
751 let negated = match join.join_type {
752 JoinType::LeftSemi
753 | JoinType::RightSemi
754 | JoinType::LeftMark
755 | JoinType::RightMark => false,
756 JoinType::LeftAnti | JoinType::RightAnti => true,
757 _ => unreachable!(),
758 };
759 let exists_expr = ast::Expr::Exists {
760 subquery: Box::new(query_builder.build()?),
761 negated,
762 };
763
764 match join.join_type {
765 JoinType::LeftMark | JoinType::RightMark => {
766 let source_schema =
767 if join.join_type == JoinType::LeftMark {
768 right_plan.schema()
769 } else {
770 left_plan.schema()
771 };
772 let (table_ref, _) = source_schema.qualified_field(0);
773 let column = self.col_to_sql(&Column::new(
774 table_ref.cloned(),
775 "mark",
776 ))?;
777 select.replace_mark(&column, &exists_expr);
778 }
779 _ => {
780 select.selection(Some(exists_expr));
781 }
782 }
783 if let Some(projection) = left_projection {
784 select.projection(projection);
785 }
786 }
787 JoinType::Inner
788 | JoinType::Left
789 | JoinType::Right
790 | JoinType::Full => {
791 let Ok(Some(relation)) = right_relation.build() else {
792 return internal_err!("Failed to build right relation");
793 };
794 let ast_join = ast::Join {
795 relation,
796 global: false,
797 join_operator: self
798 .join_operator_to_sql(join.join_type, join_constraint)?,
799 };
800 let mut from = select.pop_from().unwrap();
801 from.push_join(ast_join);
802 select.push_from(from);
803 if !already_projected {
804 let Some(left_projection) = left_projection else {
805 return internal_err!("Left projection is missing");
806 };
807
808 let Some(right_projection) = right_projection else {
809 return internal_err!("Right projection is missing");
810 };
811
812 let projection = left_projection
813 .into_iter()
814 .chain(right_projection)
815 .collect();
816 select.projection(projection);
817 }
818 }
819 };
820
821 Ok(())
822 }
823 LogicalPlan::SubqueryAlias(plan_alias) => {
824 let (plan, mut columns) =
825 subquery_alias_inner_query_and_columns(plan_alias);
826 let unparsed_table_scan = self.unparse_table_scan_pushdown(
827 plan,
828 Some(plan_alias.alias.clone()),
829 select.already_projected(),
830 )?;
831 if !select.already_projected() && unparsed_table_scan.is_none() {
834 select.projection(vec![ast::SelectItem::Wildcard(
835 ast::WildcardAdditionalOptions::default(),
836 )]);
837 }
838 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
839 if !columns.is_empty()
840 && !self.dialect.supports_column_alias_in_table_alias()
841 {
842 let rewritten_plan =
844 match inject_column_aliases_into_subquery(plan, columns) {
845 Ok(p) => p,
846 Err(e) => {
847 return internal_err!(
848 "Failed to transform SubqueryAlias plan: {e}"
849 );
850 }
851 };
852
853 columns = vec![];
854
855 self.select_to_sql_recursively(
856 &rewritten_plan,
857 query,
858 select,
859 relation,
860 )?;
861 } else {
862 self.select_to_sql_recursively(&plan, query, select, relation)?;
863 }
864
865 relation.alias(Some(
866 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
867 ));
868
869 Ok(())
870 }
871 LogicalPlan::Union(union) => {
872 if select.already_projected() {
874 return self.derive_with_dialect_alias(
875 "derived_union",
876 plan,
877 relation,
878 false,
879 vec![],
880 );
881 }
882
883 let input_exprs: Vec<SetExpr> = union
884 .inputs
885 .iter()
886 .map(|input| self.select_to_sql_expr(input, query))
887 .collect::<Result<Vec<_>>>()?;
888
889 assert_or_internal_err!(
890 input_exprs.len() >= 2,
891 "UNION operator requires at least 2 inputs"
892 );
893
894 let set_quantifier =
895 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
896 ast::SetQuantifier::None
899 } else {
900 ast::SetQuantifier::All
901 };
902
903 let union_expr = input_exprs
906 .into_iter()
907 .rev()
908 .reduce(|a, b| SetExpr::SetOperation {
909 op: ast::SetOperator::Union,
910 set_quantifier,
911 left: Box::new(b),
912 right: Box::new(a),
913 })
914 .unwrap();
915
916 let Some(query) = query.as_mut() else {
917 return internal_err!(
918 "UNION ALL operator only valid in a statement context"
919 );
920 };
921 query.body(Box::new(union_expr));
922
923 Ok(())
924 }
925 LogicalPlan::Window(window) => {
926 self.select_to_sql_recursively(
928 window.input.as_ref(),
929 query,
930 select,
931 relation,
932 )
933 }
934 LogicalPlan::EmptyRelation(_) => {
935 if !relation.has_relation() {
938 relation.empty();
939 }
940 Ok(())
941 }
942 LogicalPlan::Extension(extension) => {
943 if let Some(query) = query.as_mut() {
944 self.extension_to_sql(
945 extension.node.as_ref(),
946 &mut Some(query),
947 &mut Some(select),
948 &mut Some(relation),
949 )
950 } else {
951 self.extension_to_sql(
952 extension.node.as_ref(),
953 &mut None,
954 &mut Some(select),
955 &mut Some(relation),
956 )
957 }
958 }
959 LogicalPlan::Unnest(unnest) => {
960 assert_or_internal_err!(
961 unnest.struct_type_columns.is_empty(),
962 "Struct type columns are not currently supported in UNNEST: {:?}",
963 unnest.struct_type_columns
964 );
965
966 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
974 self.select_to_sql_recursively(&p.input, query, select, relation)
976 } else {
977 internal_err!("Unnest input is not a Projection: {unnest:?}")
978 }
979 }
980 LogicalPlan::Subquery(subquery)
981 if find_unnest_node_until_relation(subquery.subquery.as_ref())
982 .is_some() =>
983 {
984 if self.dialect.unnest_as_table_factor() {
985 self.select_to_sql_recursively(
986 subquery.subquery.as_ref(),
987 query,
988 select,
989 relation,
990 )
991 } else {
992 self.derive_with_dialect_alias(
993 "derived_unnest",
994 subquery.subquery.as_ref(),
995 relation,
996 true,
997 vec![],
998 )
999 }
1000 }
1001 _ => {
1002 not_impl_err!("Unsupported operator: {plan:?}")
1003 }
1004 }
1005 }
1006
1007 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1017 if let Expr::Alias(Alias { expr, .. }) = expr
1018 && let Expr::Column(Column { name, .. }) = expr.as_ref()
1019 && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1020 {
1021 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1022 return Some(UnnestInputType::OuterReference);
1023 }
1024 return Some(UnnestInputType::Scalar);
1025 }
1026 None
1027 }
1028
1029 fn try_unnest_to_table_factor_sql(
1030 &self,
1031 unnest: &Unnest,
1032 ) -> Result<Option<UnnestRelationBuilder>> {
1033 let mut unnest_relation = UnnestRelationBuilder::default();
1034 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1035 return Ok(None);
1036 };
1037
1038 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1039 return Ok(None);
1047 };
1048
1049 let exprs = projection
1050 .expr
1051 .iter()
1052 .map(|e| self.expr_to_sql(e))
1053 .collect::<Result<Vec<_>>>()?;
1054 unnest_relation.array_exprs(exprs);
1055
1056 Ok(Some(unnest_relation))
1057 }
1058
1059 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1060 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1061 }
1062
1063 fn unparse_table_scan_pushdown(
1066 &self,
1067 plan: &LogicalPlan,
1068 alias: Option<TableReference>,
1069 already_projected: bool,
1070 ) -> Result<Option<LogicalPlan>> {
1071 match plan {
1072 LogicalPlan::TableScan(table_scan) => {
1073 if !Self::is_scan_with_pushdown(table_scan) {
1074 return Ok(None);
1075 }
1076 let table_schema = table_scan.source.schema();
1077 let mut filter_alias_rewriter =
1078 alias.as_ref().map(|alias_name| TableAliasRewriter {
1079 table_schema: &table_schema,
1080 alias_name: alias_name.clone(),
1081 });
1082
1083 let mut builder = LogicalPlanBuilder::scan(
1084 table_scan.table_name.clone(),
1085 Arc::clone(&table_scan.source),
1086 None,
1087 )?;
1088 if let Some(ref alias) = alias
1094 && (table_scan.projection.is_some() || !table_scan.filters.is_empty())
1095 {
1096 builder = builder.alias(alias.clone())?;
1097 }
1098
1099 if !already_projected && let Some(project_vec) = &table_scan.projection {
1103 if project_vec.is_empty() {
1104 builder = builder.project(self.empty_projection_fallback())?;
1105 } else {
1106 let project_columns = project_vec
1107 .iter()
1108 .cloned()
1109 .map(|i| {
1110 let schema = table_scan.source.schema();
1111 let field = schema.field(i);
1112 if alias.is_some() {
1113 Column::new(alias.clone(), field.name().clone())
1114 } else {
1115 Column::new(
1116 Some(table_scan.table_name.clone()),
1117 field.name().clone(),
1118 )
1119 }
1120 })
1121 .collect::<Vec<_>>();
1122 builder = builder.project(project_columns)?;
1123 };
1124 }
1125
1126 let filter_expr: Result<Option<Expr>> = table_scan
1127 .filters
1128 .iter()
1129 .cloned()
1130 .map(|expr| {
1131 if let Some(ref mut rewriter) = filter_alias_rewriter {
1132 expr.rewrite(rewriter).data()
1133 } else {
1134 Ok(expr)
1135 }
1136 })
1137 .reduce(|acc, expr_result| {
1138 acc.and_then(|acc_expr| {
1139 expr_result.map(|expr| acc_expr.and(expr))
1140 })
1141 })
1142 .transpose();
1143
1144 if let Some(filter) = filter_expr? {
1145 builder = builder.filter(filter)?;
1146 }
1147
1148 if let Some(fetch) = table_scan.fetch {
1149 builder = builder.limit(0, Some(fetch))?;
1150 }
1151
1152 if let Some(alias) = alias
1157 && table_scan.projection.is_none()
1158 && table_scan.filters.is_empty()
1159 {
1160 builder = builder.alias(alias)?;
1161 }
1162
1163 Ok(Some(builder.build()?))
1164 }
1165 LogicalPlan::SubqueryAlias(subquery_alias) => {
1166 let ret = self.unparse_table_scan_pushdown(
1167 &subquery_alias.input,
1168 Some(subquery_alias.alias.clone()),
1169 already_projected,
1170 )?;
1171 if let Some(alias) = alias
1172 && let Some(plan) = ret
1173 {
1174 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1175 return Ok(Some(plan));
1176 }
1177 Ok(ret)
1178 }
1179 LogicalPlan::Projection(projection) => {
1182 if let Some(plan) = self.unparse_table_scan_pushdown(
1183 &projection.input,
1184 alias.clone(),
1185 already_projected,
1186 )? {
1187 let exprs = if alias.is_some() {
1188 let mut alias_rewriter =
1189 alias.as_ref().map(|alias_name| TableAliasRewriter {
1190 table_schema: plan.schema().as_arrow(),
1191 alias_name: alias_name.clone(),
1192 });
1193 projection
1194 .expr
1195 .iter()
1196 .cloned()
1197 .map(|expr| {
1198 if let Some(ref mut rewriter) = alias_rewriter {
1199 expr.rewrite(rewriter).data()
1200 } else {
1201 Ok(expr)
1202 }
1203 })
1204 .collect::<Result<Vec<_>>>()?
1205 } else {
1206 projection.expr.clone()
1207 };
1208 Ok(Some(
1209 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1210 ))
1211 } else {
1212 Ok(None)
1213 }
1214 }
1215 _ => Ok(None),
1216 }
1217 }
1218
1219 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1220 match expr {
1221 Expr::Alias(Alias { expr, name, .. }) => {
1222 let inner = self.expr_to_sql(expr)?;
1223
1224 let col_name = if let Some(rewritten_name) =
1226 self.dialect.col_alias_overrides(name)?
1227 {
1228 rewritten_name.to_string()
1229 } else {
1230 name.to_string()
1231 };
1232
1233 Ok(ast::SelectItem::ExprWithAlias {
1234 expr: inner,
1235 alias: self.new_ident_quoted_if_needs(col_name),
1236 })
1237 }
1238 _ => {
1239 let inner = self.expr_to_sql(expr)?;
1240
1241 Ok(ast::SelectItem::UnnamedExpr(inner))
1242 }
1243 }
1244 }
1245
1246 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1247 Ok(OrderByKind::Expressions(
1248 sort_exprs
1249 .iter()
1250 .map(|sort_expr| self.sort_to_sql(sort_expr))
1251 .collect::<Result<Vec<_>>>()?,
1252 ))
1253 }
1254
1255 fn join_operator_to_sql(
1256 &self,
1257 join_type: JoinType,
1258 constraint: ast::JoinConstraint,
1259 ) -> Result<ast::JoinOperator> {
1260 Ok(match join_type {
1261 JoinType::Inner => match &constraint {
1262 ast::JoinConstraint::On(_)
1263 | ast::JoinConstraint::Using(_)
1264 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1265 ast::JoinConstraint::None => {
1266 ast::JoinOperator::CrossJoin(constraint)
1269 }
1270 },
1271 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1272 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1273 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1274 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1275 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1276 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1277 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1278 JoinType::LeftMark | JoinType::RightMark => {
1279 unimplemented!("Unparsing of Mark join type")
1280 }
1281 })
1282 }
1283
1284 fn join_using_to_sql(
1288 &self,
1289 join_conditions: &[(Expr, Expr)],
1290 ) -> Option<ast::JoinConstraint> {
1291 let mut object_names = Vec::with_capacity(join_conditions.len());
1292 for (left, right) in join_conditions {
1293 match (left, right) {
1294 (
1295 Expr::Column(Column {
1296 relation: _,
1297 name: left_name,
1298 spans: _,
1299 }),
1300 Expr::Column(Column {
1301 relation: _,
1302 name: right_name,
1303 spans: _,
1304 }),
1305 ) if left_name == right_name => {
1306 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1310 object_names.push(ast::ObjectName::from(vec![ident]));
1311 }
1312 _ => return None,
1315 }
1316 }
1317 Some(ast::JoinConstraint::Using(object_names))
1318 }
1319
1320 fn join_constraint_to_sql(
1322 &self,
1323 constraint: JoinConstraint,
1324 conditions: &[(Expr, Expr)],
1325 filter: Option<&Expr>,
1326 ) -> Result<ast::JoinConstraint> {
1327 match (constraint, conditions, filter) {
1328 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1330 Ok(ast::JoinConstraint::None)
1331 }
1332
1333 (JoinConstraint::Using, conditions, None) => {
1334 match self.join_using_to_sql(conditions) {
1335 Some(using) => Ok(using),
1336 None => self.join_conditions_to_sql_on(conditions, None),
1339 }
1340 }
1341
1342 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1350 self.join_conditions_to_sql_on(conditions, filter)
1351 }
1352 }
1353 }
1354
1355 fn join_conditions_to_sql_on(
1359 &self,
1360 join_conditions: &[(Expr, Expr)],
1361 filter: Option<&Expr>,
1362 ) -> Result<ast::JoinConstraint> {
1363 let mut condition = None;
1364 for (left, right) in join_conditions {
1366 let l = self.expr_to_sql(left)?;
1368 let r = self.expr_to_sql(right)?;
1369 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1370 condition = match condition {
1371 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1372 None => Some(e),
1373 };
1374 }
1375
1376 condition = match (condition, filter) {
1378 (Some(expr), Some(filter)) => {
1379 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1380 }
1381 (Some(expr), None) => Some(expr),
1382 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1383 (None, None) => None,
1384 };
1385
1386 let constraint = match condition {
1387 Some(filter) => ast::JoinConstraint::On(filter),
1388 None => ast::JoinConstraint::None,
1389 };
1390
1391 Ok(constraint)
1392 }
1393
1394 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1395 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1396 }
1397
1398 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1399 let columns = columns
1400 .into_iter()
1401 .map(|ident| TableAliasColumnDef {
1402 name: ident,
1403 data_type: None,
1404 })
1405 .collect();
1406 ast::TableAlias {
1407 name: self.new_ident_quoted_if_needs(alias),
1408 columns,
1409 explicit: true,
1410 }
1411 }
1412
1413 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1414 not_impl_err!("Unsupported plan: {plan:?}")
1415 }
1416
1417 fn empty_projection_fallback(&self) -> Vec<Expr> {
1421 if self.dialect.supports_empty_select_list() {
1422 Vec::new()
1423 } else {
1424 vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
1425 }
1426 }
1427}
1428
1429impl From<BuilderError> for DataFusionError {
1430 fn from(e: BuilderError) -> Self {
1431 DataFusionError::External(Box::new(e))
1432 }
1433}
1434
1435#[derive(Debug)]
1437enum UnnestInputType {
1438 OuterReference,
1440 Scalar,
1442}