1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let plan = normalize_union_schema(plan)?;
99
100 match plan {
101 LogicalPlan::Projection(_)
102 | LogicalPlan::Filter(_)
103 | LogicalPlan::Window(_)
104 | LogicalPlan::Aggregate(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Join(_)
107 | LogicalPlan::Repartition(_)
108 | LogicalPlan::Union(_)
109 | LogicalPlan::TableScan(_)
110 | LogicalPlan::EmptyRelation(_)
111 | LogicalPlan::Subquery(_)
112 | LogicalPlan::SubqueryAlias(_)
113 | LogicalPlan::Limit(_)
114 | LogicalPlan::Statement(_)
115 | LogicalPlan::Values(_)
116 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118 LogicalPlan::Extension(extension) => {
119 self.extension_to_statement(extension.node.as_ref())
120 }
121 LogicalPlan::Explain(_)
122 | LogicalPlan::Analyze(_)
123 | LogicalPlan::Ddl(_)
124 | LogicalPlan::Copy(_)
125 | LogicalPlan::DescribeTable(_)
126 | LogicalPlan::RecursiveQuery(_)
127 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128 }
129 }
130
131 fn extension_to_statement(
135 &self,
136 node: &dyn UserDefinedLogicalNode,
137 ) -> Result<ast::Statement> {
138 let mut statement = None;
139 for unparser in &self.extension_unparsers {
140 match unparser.unparse_to_statement(node, self)? {
141 UnparseToStatementResult::Modified(stmt) => {
142 statement = Some(stmt);
143 break;
144 }
145 UnparseToStatementResult::Unmodified => {}
146 }
147 }
148 if let Some(statement) = statement {
149 Ok(statement)
150 } else {
151 not_impl_err!("Unsupported extension node: {node:?}")
152 }
153 }
154
155 fn extension_to_sql(
159 &self,
160 node: &dyn UserDefinedLogicalNode,
161 query: &mut Option<&mut QueryBuilder>,
162 select: &mut Option<&mut SelectBuilder>,
163 relation: &mut Option<&mut RelationBuilder>,
164 ) -> Result<()> {
165 for unparser in &self.extension_unparsers {
166 match unparser.unparse(node, self, query, select, relation)? {
167 UnparseWithinStatementResult::Modified => return Ok(()),
168 UnparseWithinStatementResult::Unmodified => {}
169 }
170 }
171 not_impl_err!("Unsupported extension node: {node:?}")
172 }
173
174 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175 let mut query_builder = Some(QueryBuilder::default());
176
177 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179 let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181 Ok(ast::Statement::Query(Box::new(query)))
182 }
183
184 fn select_to_sql_expr(
185 &self,
186 plan: &LogicalPlan,
187 query: &mut Option<QueryBuilder>,
188 ) -> Result<SetExpr> {
189 let mut select_builder = SelectBuilder::default();
190 select_builder.push_from(TableWithJoinsBuilder::default());
191 let mut relation_builder = RelationBuilder::default();
192 self.select_to_sql_recursively(
193 plan,
194 query,
195 &mut select_builder,
196 &mut relation_builder,
197 )?;
198
199 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201 return Ok(*body);
202 }
203
204 if !select_builder.already_projected() {
207 select_builder.projection(vec![ast::SelectItem::Wildcard(
208 ast::WildcardAdditionalOptions::default(),
209 )]);
210 }
211
212 let mut twj = select_builder.pop_from().unwrap();
213 twj.relation(relation_builder);
214 select_builder.push_from(twj);
215
216 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217 }
218
219 fn reconstruct_select_statement(
223 &self,
224 plan: &LogicalPlan,
225 p: &Projection,
226 select: &mut SelectBuilder,
227 ) -> Result<()> {
228 let mut exprs = p.expr.clone();
229
230 if let Some(unnest) = find_unnest_node_within_select(plan) {
232 exprs = exprs
233 .into_iter()
234 .map(|e| unproject_unnest_expr(e, unnest))
235 .collect::<Result<Vec<_>>>()?;
236 };
237
238 match (
239 find_agg_node_within_select(plan, true),
240 find_window_nodes_within_select(plan, None, true),
241 ) {
242 (Some(agg), window) => {
243 let window_option = window.as_deref();
244 let items = exprs
245 .into_iter()
246 .map(|proj_expr| {
247 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248 self.select_item_to_sql(&unproj)
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 select.projection(items);
253 select.group_by(ast::GroupByExpr::Expressions(
254 agg.group_expr
255 .iter()
256 .map(|expr| self.expr_to_sql(expr))
257 .collect::<Result<Vec<_>>>()?,
258 vec![],
259 ));
260 }
261 (None, Some(window)) => {
262 let items = exprs
263 .into_iter()
264 .map(|proj_expr| {
265 let unproj = unproject_window_exprs(proj_expr, &window)?;
266 self.select_item_to_sql(&unproj)
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 select.projection(items);
271 }
272 _ => {
273 let items = exprs
274 .iter()
275 .map(|e| self.select_item_to_sql(e))
276 .collect::<Result<Vec<_>>>()?;
277 select.projection(items);
278 }
279 }
280 Ok(())
281 }
282
283 fn derive(
284 &self,
285 plan: &LogicalPlan,
286 relation: &mut RelationBuilder,
287 alias: Option<ast::TableAlias>,
288 lateral: bool,
289 ) -> Result<()> {
290 let mut derived_builder = DerivedRelationBuilder::default();
291 derived_builder.lateral(lateral).alias(alias).subquery({
292 let inner_statement = self.plan_to_sql(plan)?;
293 if let ast::Statement::Query(inner_query) = inner_statement {
294 inner_query
295 } else {
296 return internal_err!(
297 "Subquery must be a Query, but found {inner_statement:?}"
298 );
299 }
300 });
301 relation.derived(derived_builder);
302
303 Ok(())
304 }
305
306 fn derive_with_dialect_alias(
307 &self,
308 alias: &str,
309 plan: &LogicalPlan,
310 relation: &mut RelationBuilder,
311 lateral: bool,
312 columns: Vec<Ident>,
313 ) -> Result<()> {
314 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
315 self.derive(
316 plan,
317 relation,
318 Some(self.new_table_alias(alias.to_string(), columns)),
319 lateral,
320 )
321 } else {
322 self.derive(plan, relation, None, lateral)
323 }
324 }
325
326 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
327 fn select_to_sql_recursively(
328 &self,
329 plan: &LogicalPlan,
330 query: &mut Option<QueryBuilder>,
331 select: &mut SelectBuilder,
332 relation: &mut RelationBuilder,
333 ) -> Result<()> {
334 match plan {
335 LogicalPlan::TableScan(scan) => {
336 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
337 plan,
338 None,
339 select.already_projected(),
340 )? {
341 return self.select_to_sql_recursively(
342 &unparsed_table_scan,
343 query,
344 select,
345 relation,
346 );
347 }
348 let mut builder = TableRelationBuilder::default();
349 let mut table_parts = vec![];
350 if let Some(catalog_name) = scan.table_name.catalog() {
351 table_parts
352 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
353 }
354 if let Some(schema_name) = scan.table_name.schema() {
355 table_parts
356 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
357 }
358 table_parts.push(
359 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
360 );
361 builder.name(ast::ObjectName::from(table_parts));
362 relation.table(builder);
363
364 Ok(())
365 }
366 LogicalPlan::Projection(p) => {
367 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
368 return self
369 .select_to_sql_recursively(&new_plan, query, select, relation);
370 }
371
372 let unnest_input_type = if p.expr.len() == 1 {
376 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
377 } else {
378 None
379 };
380 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
381 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
382 if let Some(unnest_relation) =
383 self.try_unnest_to_table_factor_sql(unnest)?
384 {
385 relation.unnest(unnest_relation);
386 return self.select_to_sql_recursively(
387 p.input.as_ref(),
388 query,
389 select,
390 relation,
391 );
392 }
393 }
394 }
395
396 let columns = if unnest_input_type.is_some() {
399 p.expr
400 .iter()
401 .map(|e| {
402 self.new_ident_quoted_if_needs(e.schema_name().to_string())
403 })
404 .collect()
405 } else {
406 vec![]
407 };
408 if select.already_projected() {
410 return self.derive_with_dialect_alias(
411 "derived_projection",
412 plan,
413 relation,
414 unnest_input_type
415 .filter(|t| matches!(t, UnnestInputType::OuterReference))
416 .is_some(),
417 columns,
418 );
419 }
420 self.reconstruct_select_statement(plan, p, select)?;
421 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
422 }
423 LogicalPlan::Filter(filter) => {
424 if let Some(agg) =
425 find_agg_node_within_select(plan, select.already_projected())
426 {
427 let unprojected =
428 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
429 let filter_expr = self.expr_to_sql(&unprojected)?;
430 select.having(Some(filter_expr));
431 } else {
432 let filter_expr = self.expr_to_sql(&filter.predicate)?;
433 select.selection(Some(filter_expr));
434 }
435
436 self.select_to_sql_recursively(
437 filter.input.as_ref(),
438 query,
439 select,
440 relation,
441 )
442 }
443 LogicalPlan::Limit(limit) => {
444 if select.already_projected() {
446 return self.derive_with_dialect_alias(
447 "derived_limit",
448 plan,
449 relation,
450 false,
451 vec![],
452 );
453 }
454 if let Some(fetch) = &limit.fetch {
455 let Some(query) = query.as_mut() else {
456 return internal_err!(
457 "Limit operator only valid in a statement context."
458 );
459 };
460 query.limit(Some(self.expr_to_sql(fetch)?));
461 }
462
463 if let Some(skip) = &limit.skip {
464 let Some(query) = query.as_mut() else {
465 return internal_err!(
466 "Offset operator only valid in a statement context."
467 );
468 };
469 query.offset(Some(ast::Offset {
470 rows: ast::OffsetRows::None,
471 value: self.expr_to_sql(skip)?,
472 }));
473 }
474
475 self.select_to_sql_recursively(
476 limit.input.as_ref(),
477 query,
478 select,
479 relation,
480 )
481 }
482 LogicalPlan::Sort(sort) => {
483 if select.already_projected() {
485 return self.derive_with_dialect_alias(
486 "derived_sort",
487 plan,
488 relation,
489 false,
490 vec![],
491 );
492 }
493 let Some(query_ref) = query else {
494 return internal_err!(
495 "Sort operator only valid in a statement context."
496 );
497 };
498
499 if let Some(fetch) = sort.fetch {
500 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
501 fetch.to_string(),
502 false,
503 ))));
504 };
505
506 let agg = find_agg_node_within_select(plan, select.already_projected());
507 let sort_exprs: Vec<SortExpr> = sort
509 .expr
510 .iter()
511 .map(|sort_expr| {
512 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
513 })
514 .collect::<Result<Vec<_>>>()?;
515
516 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
517
518 self.select_to_sql_recursively(
519 sort.input.as_ref(),
520 query,
521 select,
522 relation,
523 )
524 }
525 LogicalPlan::Aggregate(agg) => {
526 if !select.already_projected() {
528 let exprs: Vec<_> = agg
531 .aggr_expr
532 .iter()
533 .chain(agg.group_expr.iter())
534 .map(|expr| self.select_item_to_sql(expr))
535 .collect::<Result<Vec<_>>>()?;
536 select.projection(exprs);
537
538 select.group_by(ast::GroupByExpr::Expressions(
539 agg.group_expr
540 .iter()
541 .map(|expr| self.expr_to_sql(expr))
542 .collect::<Result<Vec<_>>>()?,
543 vec![],
544 ));
545 }
546
547 self.select_to_sql_recursively(
548 agg.input.as_ref(),
549 query,
550 select,
551 relation,
552 )
553 }
554 LogicalPlan::Distinct(distinct) => {
555 if select.already_projected() {
557 return self.derive_with_dialect_alias(
558 "derived_distinct",
559 plan,
560 relation,
561 false,
562 vec![],
563 );
564 }
565
566 if let Distinct::All(input) = distinct {
569 if matches!(input.as_ref(), LogicalPlan::Union(_)) {
570 if let Some(query_mut) = query.as_mut() {
571 query_mut.distinct_union();
572 return self.select_to_sql_recursively(
573 input.as_ref(),
574 query,
575 select,
576 relation,
577 );
578 }
579 }
580 }
581
582 let (select_distinct, input) = match distinct {
583 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
584 Distinct::On(on) => {
585 let exprs = on
586 .on_expr
587 .iter()
588 .map(|e| self.expr_to_sql(e))
589 .collect::<Result<Vec<_>>>()?;
590 let items = on
591 .select_expr
592 .iter()
593 .map(|e| self.select_item_to_sql(e))
594 .collect::<Result<Vec<_>>>()?;
595 if let Some(sort_expr) = &on.sort_expr {
596 if let Some(query_ref) = query {
597 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
598 } else {
599 return internal_err!(
600 "Sort operator only valid in a statement context."
601 );
602 }
603 }
604 select.projection(items);
605 (ast::Distinct::On(exprs), on.input.as_ref())
606 }
607 };
608 select.distinct(Some(select_distinct));
609 self.select_to_sql_recursively(input, query, select, relation)
610 }
611 LogicalPlan::Join(join) => {
612 let mut table_scan_filters = vec![];
613 let (left_plan, right_plan) = match join.join_type {
614 JoinType::RightSemi | JoinType::RightAnti => {
615 (&join.right, &join.left)
616 }
617 _ => (&join.left, &join.right),
618 };
619 let already_projected = select.already_projected();
623
624 let left_plan =
625 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
626 Some((plan, filters)) => {
627 table_scan_filters.extend(filters);
628 Arc::new(plan)
629 }
630 None => Arc::clone(left_plan),
631 };
632
633 self.select_to_sql_recursively(
634 left_plan.as_ref(),
635 query,
636 select,
637 relation,
638 )?;
639
640 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
641 {
642 Some(select.pop_projections())
643 } else {
644 None
645 };
646
647 let right_plan =
648 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
649 Some((plan, filters)) => {
650 table_scan_filters.extend(filters);
651 Arc::new(plan)
652 }
653 None => Arc::clone(right_plan),
654 };
655
656 let mut right_relation = RelationBuilder::default();
657
658 self.select_to_sql_recursively(
659 right_plan.as_ref(),
660 query,
661 select,
662 &mut right_relation,
663 )?;
664
665 let join_filters = if table_scan_filters.is_empty() {
666 join.filter.clone()
667 } else {
668 let Some(combined_filters) =
670 table_scan_filters.into_iter().reduce(|acc, filter| {
671 Expr::BinaryExpr(BinaryExpr {
672 left: Box::new(acc),
673 op: Operator::And,
674 right: Box::new(filter),
675 })
676 })
677 else {
678 return internal_err!("Failed to combine TableScan filters");
679 };
680
681 match &join.filter {
683 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
684 left: Box::new(filter.clone()),
685 op: Operator::And,
686 right: Box::new(combined_filters),
687 })),
688 None => Some(combined_filters),
689 }
690 };
691
692 let join_constraint = self.join_constraint_to_sql(
693 join.join_constraint,
694 &join.on,
695 join_filters.as_ref(),
696 )?;
697
698 self.select_to_sql_recursively(
699 right_plan.as_ref(),
700 query,
701 select,
702 &mut right_relation,
703 )?;
704
705 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
706 {
707 Some(select.pop_projections())
708 } else {
709 None
710 };
711
712 match join.join_type {
713 JoinType::LeftSemi
714 | JoinType::LeftAnti
715 | JoinType::LeftMark
716 | JoinType::RightSemi
717 | JoinType::RightAnti
718 | JoinType::RightMark => {
719 let mut query_builder = QueryBuilder::default();
720 let mut from = TableWithJoinsBuilder::default();
721 let mut exists_select: SelectBuilder = SelectBuilder::default();
722 from.relation(right_relation);
723 exists_select.push_from(from);
724 if let Some(filter) = &join.filter {
725 exists_select.selection(Some(self.expr_to_sql(filter)?));
726 }
727 for (left, right) in &join.on {
728 exists_select.selection(Some(
729 self.expr_to_sql(&left.clone().eq(right.clone()))?,
730 ));
731 }
732 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
733 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
734 )]);
735 query_builder.body(Box::new(SetExpr::Select(Box::new(
736 exists_select.build()?,
737 ))));
738
739 let negated = match join.join_type {
740 JoinType::LeftSemi
741 | JoinType::RightSemi
742 | JoinType::LeftMark
743 | JoinType::RightMark => false,
744 JoinType::LeftAnti | JoinType::RightAnti => true,
745 _ => unreachable!(),
746 };
747 let exists_expr = ast::Expr::Exists {
748 subquery: Box::new(query_builder.build()?),
749 negated,
750 };
751
752 match join.join_type {
753 JoinType::LeftMark | JoinType::RightMark => {
754 let source_schema =
755 if join.join_type == JoinType::LeftMark {
756 right_plan.schema()
757 } else {
758 left_plan.schema()
759 };
760 let (table_ref, _) = source_schema.qualified_field(0);
761 let column = self.col_to_sql(&Column::new(
762 table_ref.cloned(),
763 "mark",
764 ))?;
765 select.replace_mark(&column, &exists_expr);
766 }
767 _ => {
768 select.selection(Some(exists_expr));
769 }
770 }
771 if let Some(projection) = left_projection {
772 select.projection(projection);
773 }
774 }
775 JoinType::Inner
776 | JoinType::Left
777 | JoinType::Right
778 | JoinType::Full => {
779 let Ok(Some(relation)) = right_relation.build() else {
780 return internal_err!("Failed to build right relation");
781 };
782 let ast_join = ast::Join {
783 relation,
784 global: false,
785 join_operator: self
786 .join_operator_to_sql(join.join_type, join_constraint)?,
787 };
788 let mut from = select.pop_from().unwrap();
789 from.push_join(ast_join);
790 select.push_from(from);
791 if !already_projected {
792 let Some(left_projection) = left_projection else {
793 return internal_err!("Left projection is missing");
794 };
795
796 let Some(right_projection) = right_projection else {
797 return internal_err!("Right projection is missing");
798 };
799
800 let projection = left_projection
801 .into_iter()
802 .chain(right_projection.into_iter())
803 .collect();
804 select.projection(projection);
805 }
806 }
807 };
808
809 Ok(())
810 }
811 LogicalPlan::SubqueryAlias(plan_alias) => {
812 let (plan, mut columns) =
813 subquery_alias_inner_query_and_columns(plan_alias);
814 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
815 plan,
816 Some(plan_alias.alias.clone()),
817 select.already_projected(),
818 )?;
819 if !select.already_projected() && unparsed_table_scan.is_none() {
822 select.projection(vec![ast::SelectItem::Wildcard(
823 ast::WildcardAdditionalOptions::default(),
824 )]);
825 }
826 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
827 if !columns.is_empty()
828 && !self.dialect.supports_column_alias_in_table_alias()
829 {
830 let rewritten_plan =
832 match inject_column_aliases_into_subquery(plan, columns) {
833 Ok(p) => p,
834 Err(e) => {
835 return internal_err!(
836 "Failed to transform SubqueryAlias plan: {e}"
837 )
838 }
839 };
840
841 columns = vec![];
842
843 self.select_to_sql_recursively(
844 &rewritten_plan,
845 query,
846 select,
847 relation,
848 )?;
849 } else {
850 self.select_to_sql_recursively(&plan, query, select, relation)?;
851 }
852
853 relation.alias(Some(
854 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
855 ));
856
857 Ok(())
858 }
859 LogicalPlan::Union(union) => {
860 if select.already_projected() {
862 return self.derive_with_dialect_alias(
863 "derived_union",
864 plan,
865 relation,
866 false,
867 vec![],
868 );
869 }
870
871 let input_exprs: Vec<SetExpr> = union
872 .inputs
873 .iter()
874 .map(|input| self.select_to_sql_expr(input, query))
875 .collect::<Result<Vec<_>>>()?;
876
877 if input_exprs.len() < 2 {
878 return internal_err!("UNION operator requires at least 2 inputs");
879 }
880
881 let set_quantifier =
882 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
883 ast::SetQuantifier::None
886 } else {
887 ast::SetQuantifier::All
888 };
889
890 let union_expr = input_exprs
893 .into_iter()
894 .rev()
895 .reduce(|a, b| SetExpr::SetOperation {
896 op: ast::SetOperator::Union,
897 set_quantifier,
898 left: Box::new(b),
899 right: Box::new(a),
900 })
901 .unwrap();
902
903 let Some(query) = query.as_mut() else {
904 return internal_err!(
905 "UNION ALL operator only valid in a statement context"
906 );
907 };
908 query.body(Box::new(union_expr));
909
910 Ok(())
911 }
912 LogicalPlan::Window(window) => {
913 self.select_to_sql_recursively(
915 window.input.as_ref(),
916 query,
917 select,
918 relation,
919 )
920 }
921 LogicalPlan::EmptyRelation(_) => {
922 if !relation.has_relation() {
925 relation.empty();
926 }
927 Ok(())
928 }
929 LogicalPlan::Extension(extension) => {
930 if let Some(query) = query.as_mut() {
931 self.extension_to_sql(
932 extension.node.as_ref(),
933 &mut Some(query),
934 &mut Some(select),
935 &mut Some(relation),
936 )
937 } else {
938 self.extension_to_sql(
939 extension.node.as_ref(),
940 &mut None,
941 &mut Some(select),
942 &mut Some(relation),
943 )
944 }
945 }
946 LogicalPlan::Unnest(unnest) => {
947 if !unnest.struct_type_columns.is_empty() {
948 return internal_err!(
949 "Struct type columns are not currently supported in UNNEST: {:?}",
950 unnest.struct_type_columns
951 );
952 }
953
954 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
962 self.select_to_sql_recursively(&p.input, query, select, relation)
964 } else {
965 internal_err!("Unnest input is not a Projection: {unnest:?}")
966 }
967 }
968 LogicalPlan::Subquery(subquery)
969 if find_unnest_node_until_relation(subquery.subquery.as_ref())
970 .is_some() =>
971 {
972 if self.dialect.unnest_as_table_factor() {
973 self.select_to_sql_recursively(
974 subquery.subquery.as_ref(),
975 query,
976 select,
977 relation,
978 )
979 } else {
980 self.derive_with_dialect_alias(
981 "derived_unnest",
982 subquery.subquery.as_ref(),
983 relation,
984 true,
985 vec![],
986 )
987 }
988 }
989 _ => {
990 not_impl_err!("Unsupported operator: {plan:?}")
991 }
992 }
993 }
994
995 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1005 if let Expr::Alias(Alias { expr, .. }) = expr {
1006 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1007 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1008 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1009 return Some(UnnestInputType::OuterReference);
1010 }
1011 return Some(UnnestInputType::Scalar);
1012 }
1013 }
1014 }
1015 None
1016 }
1017
1018 fn try_unnest_to_table_factor_sql(
1019 &self,
1020 unnest: &Unnest,
1021 ) -> Result<Option<UnnestRelationBuilder>> {
1022 let mut unnest_relation = UnnestRelationBuilder::default();
1023 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1024 return Ok(None);
1025 };
1026
1027 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1028 return Ok(None);
1036 };
1037
1038 let exprs = projection
1039 .expr
1040 .iter()
1041 .map(|e| self.expr_to_sql(e))
1042 .collect::<Result<Vec<_>>>()?;
1043 unnest_relation.array_exprs(exprs);
1044
1045 Ok(Some(unnest_relation))
1046 }
1047
1048 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1049 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1050 }
1051
1052 fn unparse_table_scan_pushdown(
1055 plan: &LogicalPlan,
1056 alias: Option<TableReference>,
1057 already_projected: bool,
1058 ) -> Result<Option<LogicalPlan>> {
1059 match plan {
1060 LogicalPlan::TableScan(table_scan) => {
1061 if !Self::is_scan_with_pushdown(table_scan) {
1062 return Ok(None);
1063 }
1064 let table_schema = table_scan.source.schema();
1065 let mut filter_alias_rewriter =
1066 alias.as_ref().map(|alias_name| TableAliasRewriter {
1067 table_schema: &table_schema,
1068 alias_name: alias_name.clone(),
1069 });
1070
1071 let mut builder = LogicalPlanBuilder::scan(
1072 table_scan.table_name.clone(),
1073 Arc::clone(&table_scan.source),
1074 None,
1075 )?;
1076 if let Some(ref alias) = alias {
1082 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1083 builder = builder.alias(alias.clone())?;
1084 }
1085 }
1086
1087 if !already_projected {
1091 if let Some(project_vec) = &table_scan.projection {
1092 if project_vec.is_empty() {
1093 builder = builder.project(vec![Expr::Literal(
1094 ScalarValue::Int64(Some(1)),
1095 None,
1096 )])?;
1097 } else {
1098 let project_columns = project_vec
1099 .iter()
1100 .cloned()
1101 .map(|i| {
1102 let schema = table_scan.source.schema();
1103 let field = schema.field(i);
1104 if alias.is_some() {
1105 Column::new(alias.clone(), field.name().clone())
1106 } else {
1107 Column::new(
1108 Some(table_scan.table_name.clone()),
1109 field.name().clone(),
1110 )
1111 }
1112 })
1113 .collect::<Vec<_>>();
1114 builder = builder.project(project_columns)?;
1115 };
1116 }
1117 }
1118
1119 let filter_expr: Result<Option<Expr>> = table_scan
1120 .filters
1121 .iter()
1122 .cloned()
1123 .map(|expr| {
1124 if let Some(ref mut rewriter) = filter_alias_rewriter {
1125 expr.rewrite(rewriter).data()
1126 } else {
1127 Ok(expr)
1128 }
1129 })
1130 .reduce(|acc, expr_result| {
1131 acc.and_then(|acc_expr| {
1132 expr_result.map(|expr| acc_expr.and(expr))
1133 })
1134 })
1135 .transpose();
1136
1137 if let Some(filter) = filter_expr? {
1138 builder = builder.filter(filter)?;
1139 }
1140
1141 if let Some(fetch) = table_scan.fetch {
1142 builder = builder.limit(0, Some(fetch))?;
1143 }
1144
1145 if let Some(alias) = alias {
1150 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1151 builder = builder.alias(alias)?;
1152 }
1153 }
1154
1155 Ok(Some(builder.build()?))
1156 }
1157 LogicalPlan::SubqueryAlias(subquery_alias) => {
1158 let ret = Self::unparse_table_scan_pushdown(
1159 &subquery_alias.input,
1160 Some(subquery_alias.alias.clone()),
1161 already_projected,
1162 )?;
1163 if let Some(alias) = alias {
1164 if let Some(plan) = ret {
1165 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1166 return Ok(Some(plan));
1167 }
1168 }
1169 Ok(ret)
1170 }
1171 LogicalPlan::Projection(projection) => {
1174 if let Some(plan) = Self::unparse_table_scan_pushdown(
1175 &projection.input,
1176 alias.clone(),
1177 already_projected,
1178 )? {
1179 let exprs = if alias.is_some() {
1180 let mut alias_rewriter =
1181 alias.as_ref().map(|alias_name| TableAliasRewriter {
1182 table_schema: plan.schema().as_arrow(),
1183 alias_name: alias_name.clone(),
1184 });
1185 projection
1186 .expr
1187 .iter()
1188 .cloned()
1189 .map(|expr| {
1190 if let Some(ref mut rewriter) = alias_rewriter {
1191 expr.rewrite(rewriter).data()
1192 } else {
1193 Ok(expr)
1194 }
1195 })
1196 .collect::<Result<Vec<_>>>()?
1197 } else {
1198 projection.expr.clone()
1199 };
1200 Ok(Some(
1201 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1202 ))
1203 } else {
1204 Ok(None)
1205 }
1206 }
1207 _ => Ok(None),
1208 }
1209 }
1210
1211 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1212 match expr {
1213 Expr::Alias(Alias { expr, name, .. }) => {
1214 let inner = self.expr_to_sql(expr)?;
1215
1216 let col_name = if let Some(rewritten_name) =
1218 self.dialect.col_alias_overrides(name)?
1219 {
1220 rewritten_name.to_string()
1221 } else {
1222 name.to_string()
1223 };
1224
1225 Ok(ast::SelectItem::ExprWithAlias {
1226 expr: inner,
1227 alias: self.new_ident_quoted_if_needs(col_name),
1228 })
1229 }
1230 _ => {
1231 let inner = self.expr_to_sql(expr)?;
1232
1233 Ok(ast::SelectItem::UnnamedExpr(inner))
1234 }
1235 }
1236 }
1237
1238 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1239 Ok(OrderByKind::Expressions(
1240 sort_exprs
1241 .iter()
1242 .map(|sort_expr| self.sort_to_sql(sort_expr))
1243 .collect::<Result<Vec<_>>>()?,
1244 ))
1245 }
1246
1247 fn join_operator_to_sql(
1248 &self,
1249 join_type: JoinType,
1250 constraint: ast::JoinConstraint,
1251 ) -> Result<ast::JoinOperator> {
1252 Ok(match join_type {
1253 JoinType::Inner => match &constraint {
1254 ast::JoinConstraint::On(_)
1255 | ast::JoinConstraint::Using(_)
1256 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1257 ast::JoinConstraint::None => {
1258 ast::JoinOperator::CrossJoin
1261 }
1262 },
1263 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1264 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1265 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1266 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1267 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1268 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1269 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1270 JoinType::LeftMark | JoinType::RightMark => {
1271 unimplemented!("Unparsing of Mark join type")
1272 }
1273 })
1274 }
1275
1276 fn join_using_to_sql(
1280 &self,
1281 join_conditions: &[(Expr, Expr)],
1282 ) -> Option<ast::JoinConstraint> {
1283 let mut object_names = Vec::with_capacity(join_conditions.len());
1284 for (left, right) in join_conditions {
1285 match (left, right) {
1286 (
1287 Expr::Column(Column {
1288 relation: _,
1289 name: left_name,
1290 spans: _,
1291 }),
1292 Expr::Column(Column {
1293 relation: _,
1294 name: right_name,
1295 spans: _,
1296 }),
1297 ) if left_name == right_name => {
1298 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1302 object_names.push(ast::ObjectName::from(vec![ident]));
1303 }
1304 _ => return None,
1307 }
1308 }
1309 Some(ast::JoinConstraint::Using(object_names))
1310 }
1311
1312 fn join_constraint_to_sql(
1314 &self,
1315 constraint: JoinConstraint,
1316 conditions: &[(Expr, Expr)],
1317 filter: Option<&Expr>,
1318 ) -> Result<ast::JoinConstraint> {
1319 match (constraint, conditions, filter) {
1320 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1322 Ok(ast::JoinConstraint::None)
1323 }
1324
1325 (JoinConstraint::Using, conditions, None) => {
1326 match self.join_using_to_sql(conditions) {
1327 Some(using) => Ok(using),
1328 None => self.join_conditions_to_sql_on(conditions, None),
1331 }
1332 }
1333
1334 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1342 self.join_conditions_to_sql_on(conditions, filter)
1343 }
1344 }
1345 }
1346
1347 fn join_conditions_to_sql_on(
1351 &self,
1352 join_conditions: &[(Expr, Expr)],
1353 filter: Option<&Expr>,
1354 ) -> Result<ast::JoinConstraint> {
1355 let mut condition = None;
1356 for (left, right) in join_conditions {
1358 let l = self.expr_to_sql(left)?;
1360 let r = self.expr_to_sql(right)?;
1361 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1362 condition = match condition {
1363 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1364 None => Some(e),
1365 };
1366 }
1367
1368 condition = match (condition, filter) {
1370 (Some(expr), Some(filter)) => {
1371 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1372 }
1373 (Some(expr), None) => Some(expr),
1374 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1375 (None, None) => None,
1376 };
1377
1378 let constraint = match condition {
1379 Some(filter) => ast::JoinConstraint::On(filter),
1380 None => ast::JoinConstraint::None,
1381 };
1382
1383 Ok(constraint)
1384 }
1385
1386 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1387 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1388 }
1389
1390 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1391 let columns = columns
1392 .into_iter()
1393 .map(|ident| TableAliasColumnDef {
1394 name: ident,
1395 data_type: None,
1396 })
1397 .collect();
1398 ast::TableAlias {
1399 name: self.new_ident_quoted_if_needs(alias),
1400 columns,
1401 }
1402 }
1403
1404 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1405 not_impl_err!("Unsupported plan: {plan:?}")
1406 }
1407}
1408
1409impl From<BuilderError> for DataFusionError {
1410 fn from(e: BuilderError) -> Self {
1411 DataFusionError::External(Box::new(e))
1412 }
1413}
1414
1415#[derive(Debug)]
1417enum UnnestInputType {
1418 OuterReference,
1420 Scalar,
1422}