1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let plan = normalize_union_schema(plan)?;
99
100 match plan {
101 LogicalPlan::Projection(_)
102 | LogicalPlan::Filter(_)
103 | LogicalPlan::Window(_)
104 | LogicalPlan::Aggregate(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Join(_)
107 | LogicalPlan::Repartition(_)
108 | LogicalPlan::Union(_)
109 | LogicalPlan::TableScan(_)
110 | LogicalPlan::EmptyRelation(_)
111 | LogicalPlan::Subquery(_)
112 | LogicalPlan::SubqueryAlias(_)
113 | LogicalPlan::Limit(_)
114 | LogicalPlan::Statement(_)
115 | LogicalPlan::Values(_)
116 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118 LogicalPlan::Extension(extension) => {
119 self.extension_to_statement(extension.node.as_ref())
120 }
121 LogicalPlan::Explain(_)
122 | LogicalPlan::Analyze(_)
123 | LogicalPlan::Ddl(_)
124 | LogicalPlan::Copy(_)
125 | LogicalPlan::DescribeTable(_)
126 | LogicalPlan::RecursiveQuery(_)
127 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128 }
129 }
130
131 fn extension_to_statement(
135 &self,
136 node: &dyn UserDefinedLogicalNode,
137 ) -> Result<ast::Statement> {
138 let mut statement = None;
139 for unparser in &self.extension_unparsers {
140 match unparser.unparse_to_statement(node, self)? {
141 UnparseToStatementResult::Modified(stmt) => {
142 statement = Some(stmt);
143 break;
144 }
145 UnparseToStatementResult::Unmodified => {}
146 }
147 }
148 if let Some(statement) = statement {
149 Ok(statement)
150 } else {
151 not_impl_err!("Unsupported extension node: {node:?}")
152 }
153 }
154
155 fn extension_to_sql(
159 &self,
160 node: &dyn UserDefinedLogicalNode,
161 query: &mut Option<&mut QueryBuilder>,
162 select: &mut Option<&mut SelectBuilder>,
163 relation: &mut Option<&mut RelationBuilder>,
164 ) -> Result<()> {
165 for unparser in &self.extension_unparsers {
166 match unparser.unparse(node, self, query, select, relation)? {
167 UnparseWithinStatementResult::Modified => return Ok(()),
168 UnparseWithinStatementResult::Unmodified => {}
169 }
170 }
171 not_impl_err!("Unsupported extension node: {node:?}")
172 }
173
174 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175 let mut query_builder = Some(QueryBuilder::default());
176
177 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179 let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181 Ok(ast::Statement::Query(Box::new(query)))
182 }
183
184 fn select_to_sql_expr(
185 &self,
186 plan: &LogicalPlan,
187 query: &mut Option<QueryBuilder>,
188 ) -> Result<SetExpr> {
189 let mut select_builder = SelectBuilder::default();
190 select_builder.push_from(TableWithJoinsBuilder::default());
191 let mut relation_builder = RelationBuilder::default();
192 self.select_to_sql_recursively(
193 plan,
194 query,
195 &mut select_builder,
196 &mut relation_builder,
197 )?;
198
199 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201 return Ok(*body);
202 }
203
204 if !select_builder.already_projected() {
207 select_builder.projection(vec![ast::SelectItem::Wildcard(
208 ast::WildcardAdditionalOptions::default(),
209 )]);
210 }
211
212 let mut twj = select_builder.pop_from().unwrap();
213 twj.relation(relation_builder);
214 select_builder.push_from(twj);
215
216 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217 }
218
219 fn reconstruct_select_statement(
223 &self,
224 plan: &LogicalPlan,
225 p: &Projection,
226 select: &mut SelectBuilder,
227 ) -> Result<()> {
228 let mut exprs = p.expr.clone();
229
230 if let Some(unnest) = find_unnest_node_within_select(plan) {
232 exprs = exprs
233 .into_iter()
234 .map(|e| unproject_unnest_expr(e, unnest))
235 .collect::<Result<Vec<_>>>()?;
236 };
237
238 match (
239 find_agg_node_within_select(plan, true),
240 find_window_nodes_within_select(plan, None, true),
241 ) {
242 (Some(agg), window) => {
243 let window_option = window.as_deref();
244 let items = exprs
245 .into_iter()
246 .map(|proj_expr| {
247 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248 self.select_item_to_sql(&unproj)
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 select.projection(items);
253 select.group_by(ast::GroupByExpr::Expressions(
254 agg.group_expr
255 .iter()
256 .map(|expr| self.expr_to_sql(expr))
257 .collect::<Result<Vec<_>>>()?,
258 vec![],
259 ));
260 }
261 (None, Some(window)) => {
262 let items = exprs
263 .into_iter()
264 .map(|proj_expr| {
265 let unproj = unproject_window_exprs(proj_expr, &window)?;
266 self.select_item_to_sql(&unproj)
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 select.projection(items);
271 }
272 _ => {
273 let items = exprs
274 .iter()
275 .map(|e| self.select_item_to_sql(e))
276 .collect::<Result<Vec<_>>>()?;
277 select.projection(items);
278 }
279 }
280 Ok(())
281 }
282
283 fn derive(
284 &self,
285 plan: &LogicalPlan,
286 relation: &mut RelationBuilder,
287 alias: Option<ast::TableAlias>,
288 lateral: bool,
289 ) -> Result<()> {
290 let mut derived_builder = DerivedRelationBuilder::default();
291 derived_builder.lateral(lateral).alias(alias).subquery({
292 let inner_statement = self.plan_to_sql(plan)?;
293 if let ast::Statement::Query(inner_query) = inner_statement {
294 inner_query
295 } else {
296 return internal_err!(
297 "Subquery must be a Query, but found {inner_statement:?}"
298 );
299 }
300 });
301 relation.derived(derived_builder);
302
303 Ok(())
304 }
305
306 fn derive_with_dialect_alias(
307 &self,
308 alias: &str,
309 plan: &LogicalPlan,
310 relation: &mut RelationBuilder,
311 lateral: bool,
312 columns: Vec<Ident>,
313 ) -> Result<()> {
314 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
315 self.derive(
316 plan,
317 relation,
318 Some(self.new_table_alias(alias.to_string(), columns)),
319 lateral,
320 )
321 } else {
322 self.derive(plan, relation, None, lateral)
323 }
324 }
325
326 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
327 fn select_to_sql_recursively(
328 &self,
329 plan: &LogicalPlan,
330 query: &mut Option<QueryBuilder>,
331 select: &mut SelectBuilder,
332 relation: &mut RelationBuilder,
333 ) -> Result<()> {
334 match plan {
335 LogicalPlan::TableScan(scan) => {
336 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
337 plan,
338 None,
339 select.already_projected(),
340 )? {
341 return self.select_to_sql_recursively(
342 &unparsed_table_scan,
343 query,
344 select,
345 relation,
346 );
347 }
348 let mut builder = TableRelationBuilder::default();
349 let mut table_parts = vec![];
350 if let Some(catalog_name) = scan.table_name.catalog() {
351 table_parts
352 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
353 }
354 if let Some(schema_name) = scan.table_name.schema() {
355 table_parts
356 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
357 }
358 table_parts.push(
359 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
360 );
361 builder.name(ast::ObjectName::from(table_parts));
362 relation.table(builder);
363
364 Ok(())
365 }
366 LogicalPlan::Projection(p) => {
367 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
368 return self
369 .select_to_sql_recursively(&new_plan, query, select, relation);
370 }
371
372 let unnest_input_type = if p.expr.len() == 1 {
376 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
377 } else {
378 None
379 };
380 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
381 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
382 if let Some(unnest_relation) =
383 self.try_unnest_to_table_factor_sql(unnest)?
384 {
385 relation.unnest(unnest_relation);
386 return self.select_to_sql_recursively(
387 p.input.as_ref(),
388 query,
389 select,
390 relation,
391 );
392 }
393 }
394 }
395
396 let columns = if unnest_input_type.is_some() {
399 p.expr
400 .iter()
401 .map(|e| {
402 self.new_ident_quoted_if_needs(e.schema_name().to_string())
403 })
404 .collect()
405 } else {
406 vec![]
407 };
408 if select.already_projected() {
410 return self.derive_with_dialect_alias(
411 "derived_projection",
412 plan,
413 relation,
414 unnest_input_type
415 .filter(|t| matches!(t, UnnestInputType::OuterReference))
416 .is_some(),
417 columns,
418 );
419 }
420 self.reconstruct_select_statement(plan, p, select)?;
421 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
422 }
423 LogicalPlan::Filter(filter) => {
424 if let Some(agg) =
425 find_agg_node_within_select(plan, select.already_projected())
426 {
427 let unprojected =
428 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
429 let filter_expr = self.expr_to_sql(&unprojected)?;
430 select.having(Some(filter_expr));
431 } else {
432 let filter_expr = self.expr_to_sql(&filter.predicate)?;
433 select.selection(Some(filter_expr));
434 }
435
436 self.select_to_sql_recursively(
437 filter.input.as_ref(),
438 query,
439 select,
440 relation,
441 )
442 }
443 LogicalPlan::Limit(limit) => {
444 if select.already_projected() {
446 return self.derive_with_dialect_alias(
447 "derived_limit",
448 plan,
449 relation,
450 false,
451 vec![],
452 );
453 }
454 if let Some(fetch) = &limit.fetch {
455 let Some(query) = query.as_mut() else {
456 return internal_err!(
457 "Limit operator only valid in a statement context."
458 );
459 };
460 query.limit(Some(self.expr_to_sql(fetch)?));
461 }
462
463 if let Some(skip) = &limit.skip {
464 let Some(query) = query.as_mut() else {
465 return internal_err!(
466 "Offset operator only valid in a statement context."
467 );
468 };
469
470 query.offset(Some(ast::Offset {
471 rows: ast::OffsetRows::None,
472 value: self.expr_to_sql(skip)?,
473 }));
474 }
475
476 self.select_to_sql_recursively(
477 limit.input.as_ref(),
478 query,
479 select,
480 relation,
481 )
482 }
483 LogicalPlan::Sort(sort) => {
484 if select.already_projected() {
486 return self.derive_with_dialect_alias(
487 "derived_sort",
488 plan,
489 relation,
490 false,
491 vec![],
492 );
493 }
494 let Some(query_ref) = query else {
495 return internal_err!(
496 "Sort operator only valid in a statement context."
497 );
498 };
499
500 if let Some(fetch) = sort.fetch {
501 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
502 fetch.to_string(),
503 false,
504 ))));
505 };
506
507 let agg = find_agg_node_within_select(plan, select.already_projected());
508 let sort_exprs: Vec<SortExpr> = sort
510 .expr
511 .iter()
512 .map(|sort_expr| {
513 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
514 })
515 .collect::<Result<Vec<_>>>()?;
516
517 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
518
519 self.select_to_sql_recursively(
520 sort.input.as_ref(),
521 query,
522 select,
523 relation,
524 )
525 }
526 LogicalPlan::Aggregate(agg) => {
527 if !select.already_projected() {
529 let exprs: Vec<_> = agg
532 .aggr_expr
533 .iter()
534 .chain(agg.group_expr.iter())
535 .map(|expr| self.select_item_to_sql(expr))
536 .collect::<Result<Vec<_>>>()?;
537 select.projection(exprs);
538
539 select.group_by(ast::GroupByExpr::Expressions(
540 agg.group_expr
541 .iter()
542 .map(|expr| self.expr_to_sql(expr))
543 .collect::<Result<Vec<_>>>()?,
544 vec![],
545 ));
546 }
547
548 self.select_to_sql_recursively(
549 agg.input.as_ref(),
550 query,
551 select,
552 relation,
553 )
554 }
555 LogicalPlan::Distinct(distinct) => {
556 if select.already_projected() {
558 return self.derive_with_dialect_alias(
559 "derived_distinct",
560 plan,
561 relation,
562 false,
563 vec![],
564 );
565 }
566
567 if let Distinct::All(input) = distinct {
570 if matches!(input.as_ref(), LogicalPlan::Union(_)) {
571 if let Some(query_mut) = query.as_mut() {
572 query_mut.distinct_union();
573 return self.select_to_sql_recursively(
574 input.as_ref(),
575 query,
576 select,
577 relation,
578 );
579 }
580 }
581 }
582
583 let (select_distinct, input) = match distinct {
584 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
585 Distinct::On(on) => {
586 let exprs = on
587 .on_expr
588 .iter()
589 .map(|e| self.expr_to_sql(e))
590 .collect::<Result<Vec<_>>>()?;
591 let items = on
592 .select_expr
593 .iter()
594 .map(|e| self.select_item_to_sql(e))
595 .collect::<Result<Vec<_>>>()?;
596 if let Some(sort_expr) = &on.sort_expr {
597 if let Some(query_ref) = query {
598 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
599 } else {
600 return internal_err!(
601 "Sort operator only valid in a statement context."
602 );
603 }
604 }
605 select.projection(items);
606 (ast::Distinct::On(exprs), on.input.as_ref())
607 }
608 };
609 select.distinct(Some(select_distinct));
610 self.select_to_sql_recursively(input, query, select, relation)
611 }
612 LogicalPlan::Join(join) => {
613 let mut table_scan_filters = vec![];
614 let (left_plan, right_plan) = match join.join_type {
615 JoinType::RightSemi | JoinType::RightAnti => {
616 (&join.right, &join.left)
617 }
618 _ => (&join.left, &join.right),
619 };
620 let already_projected = select.already_projected();
624
625 let left_plan =
626 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
627 Some((plan, filters)) => {
628 table_scan_filters.extend(filters);
629 Arc::new(plan)
630 }
631 None => Arc::clone(left_plan),
632 };
633
634 self.select_to_sql_recursively(
635 left_plan.as_ref(),
636 query,
637 select,
638 relation,
639 )?;
640
641 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
642 {
643 Some(select.pop_projections())
644 } else {
645 None
646 };
647
648 let right_plan =
649 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
650 Some((plan, filters)) => {
651 table_scan_filters.extend(filters);
652 Arc::new(plan)
653 }
654 None => Arc::clone(right_plan),
655 };
656
657 let mut right_relation = RelationBuilder::default();
658
659 self.select_to_sql_recursively(
660 right_plan.as_ref(),
661 query,
662 select,
663 &mut right_relation,
664 )?;
665
666 let join_filters = if table_scan_filters.is_empty() {
667 join.filter.clone()
668 } else {
669 let Some(combined_filters) =
671 table_scan_filters.into_iter().reduce(|acc, filter| {
672 Expr::BinaryExpr(BinaryExpr {
673 left: Box::new(acc),
674 op: Operator::And,
675 right: Box::new(filter),
676 })
677 })
678 else {
679 return internal_err!("Failed to combine TableScan filters");
680 };
681
682 match &join.filter {
684 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
685 left: Box::new(filter.clone()),
686 op: Operator::And,
687 right: Box::new(combined_filters),
688 })),
689 None => Some(combined_filters),
690 }
691 };
692
693 let join_constraint = self.join_constraint_to_sql(
694 join.join_constraint,
695 &join.on,
696 join_filters.as_ref(),
697 )?;
698
699 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
700 {
701 Some(select.pop_projections())
702 } else {
703 None
704 };
705
706 match join.join_type {
707 JoinType::LeftSemi
708 | JoinType::LeftAnti
709 | JoinType::LeftMark
710 | JoinType::RightSemi
711 | JoinType::RightAnti
712 | JoinType::RightMark => {
713 let mut query_builder = QueryBuilder::default();
714 let mut from = TableWithJoinsBuilder::default();
715 let mut exists_select: SelectBuilder = SelectBuilder::default();
716 from.relation(right_relation);
717 exists_select.push_from(from);
718 if let Some(filter) = &join.filter {
719 exists_select.selection(Some(self.expr_to_sql(filter)?));
720 }
721 for (left, right) in &join.on {
722 exists_select.selection(Some(
723 self.expr_to_sql(&left.clone().eq(right.clone()))?,
724 ));
725 }
726 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
727 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
728 )]);
729 query_builder.body(Box::new(SetExpr::Select(Box::new(
730 exists_select.build()?,
731 ))));
732
733 let negated = match join.join_type {
734 JoinType::LeftSemi
735 | JoinType::RightSemi
736 | JoinType::LeftMark
737 | JoinType::RightMark => false,
738 JoinType::LeftAnti | JoinType::RightAnti => true,
739 _ => unreachable!(),
740 };
741 let exists_expr = ast::Expr::Exists {
742 subquery: Box::new(query_builder.build()?),
743 negated,
744 };
745
746 match join.join_type {
747 JoinType::LeftMark | JoinType::RightMark => {
748 let source_schema =
749 if join.join_type == JoinType::LeftMark {
750 right_plan.schema()
751 } else {
752 left_plan.schema()
753 };
754 let (table_ref, _) = source_schema.qualified_field(0);
755 let column = self.col_to_sql(&Column::new(
756 table_ref.cloned(),
757 "mark",
758 ))?;
759 select.replace_mark(&column, &exists_expr);
760 }
761 _ => {
762 select.selection(Some(exists_expr));
763 }
764 }
765 if let Some(projection) = left_projection {
766 select.projection(projection);
767 }
768 }
769 JoinType::Inner
770 | JoinType::Left
771 | JoinType::Right
772 | JoinType::Full => {
773 let Ok(Some(relation)) = right_relation.build() else {
774 return internal_err!("Failed to build right relation");
775 };
776 let ast_join = ast::Join {
777 relation,
778 global: false,
779 join_operator: self
780 .join_operator_to_sql(join.join_type, join_constraint)?,
781 };
782 let mut from = select.pop_from().unwrap();
783 from.push_join(ast_join);
784 select.push_from(from);
785 if !already_projected {
786 let Some(left_projection) = left_projection else {
787 return internal_err!("Left projection is missing");
788 };
789
790 let Some(right_projection) = right_projection else {
791 return internal_err!("Right projection is missing");
792 };
793
794 let projection = left_projection
795 .into_iter()
796 .chain(right_projection)
797 .collect();
798 select.projection(projection);
799 }
800 }
801 };
802
803 Ok(())
804 }
805 LogicalPlan::SubqueryAlias(plan_alias) => {
806 let (plan, mut columns) =
807 subquery_alias_inner_query_and_columns(plan_alias);
808 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
809 plan,
810 Some(plan_alias.alias.clone()),
811 select.already_projected(),
812 )?;
813 if !select.already_projected() && unparsed_table_scan.is_none() {
816 select.projection(vec![ast::SelectItem::Wildcard(
817 ast::WildcardAdditionalOptions::default(),
818 )]);
819 }
820 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
821 if !columns.is_empty()
822 && !self.dialect.supports_column_alias_in_table_alias()
823 {
824 let rewritten_plan =
826 match inject_column_aliases_into_subquery(plan, columns) {
827 Ok(p) => p,
828 Err(e) => {
829 return internal_err!(
830 "Failed to transform SubqueryAlias plan: {e}"
831 )
832 }
833 };
834
835 columns = vec![];
836
837 self.select_to_sql_recursively(
838 &rewritten_plan,
839 query,
840 select,
841 relation,
842 )?;
843 } else {
844 self.select_to_sql_recursively(&plan, query, select, relation)?;
845 }
846
847 relation.alias(Some(
848 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
849 ));
850
851 Ok(())
852 }
853 LogicalPlan::Union(union) => {
854 if select.already_projected() {
856 return self.derive_with_dialect_alias(
857 "derived_union",
858 plan,
859 relation,
860 false,
861 vec![],
862 );
863 }
864
865 let input_exprs: Vec<SetExpr> = union
866 .inputs
867 .iter()
868 .map(|input| self.select_to_sql_expr(input, query))
869 .collect::<Result<Vec<_>>>()?;
870
871 if input_exprs.len() < 2 {
872 return internal_err!("UNION operator requires at least 2 inputs");
873 }
874
875 let set_quantifier =
876 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
877 ast::SetQuantifier::None
880 } else {
881 ast::SetQuantifier::All
882 };
883
884 let union_expr = input_exprs
887 .into_iter()
888 .rev()
889 .reduce(|a, b| SetExpr::SetOperation {
890 op: ast::SetOperator::Union,
891 set_quantifier,
892 left: Box::new(b),
893 right: Box::new(a),
894 })
895 .unwrap();
896
897 let Some(query) = query.as_mut() else {
898 return internal_err!(
899 "UNION ALL operator only valid in a statement context"
900 );
901 };
902 query.body(Box::new(union_expr));
903
904 Ok(())
905 }
906 LogicalPlan::Window(window) => {
907 self.select_to_sql_recursively(
909 window.input.as_ref(),
910 query,
911 select,
912 relation,
913 )
914 }
915 LogicalPlan::EmptyRelation(_) => {
916 if !relation.has_relation() {
919 relation.empty();
920 }
921 Ok(())
922 }
923 LogicalPlan::Extension(extension) => {
924 if let Some(query) = query.as_mut() {
925 self.extension_to_sql(
926 extension.node.as_ref(),
927 &mut Some(query),
928 &mut Some(select),
929 &mut Some(relation),
930 )
931 } else {
932 self.extension_to_sql(
933 extension.node.as_ref(),
934 &mut None,
935 &mut Some(select),
936 &mut Some(relation),
937 )
938 }
939 }
940 LogicalPlan::Unnest(unnest) => {
941 if !unnest.struct_type_columns.is_empty() {
942 return internal_err!(
943 "Struct type columns are not currently supported in UNNEST: {:?}",
944 unnest.struct_type_columns
945 );
946 }
947
948 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
956 self.select_to_sql_recursively(&p.input, query, select, relation)
958 } else {
959 internal_err!("Unnest input is not a Projection: {unnest:?}")
960 }
961 }
962 LogicalPlan::Subquery(subquery)
963 if find_unnest_node_until_relation(subquery.subquery.as_ref())
964 .is_some() =>
965 {
966 if self.dialect.unnest_as_table_factor() {
967 self.select_to_sql_recursively(
968 subquery.subquery.as_ref(),
969 query,
970 select,
971 relation,
972 )
973 } else {
974 self.derive_with_dialect_alias(
975 "derived_unnest",
976 subquery.subquery.as_ref(),
977 relation,
978 true,
979 vec![],
980 )
981 }
982 }
983 _ => {
984 not_impl_err!("Unsupported operator: {plan:?}")
985 }
986 }
987 }
988
989 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
999 if let Expr::Alias(Alias { expr, .. }) = expr {
1000 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1001 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1002 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1003 return Some(UnnestInputType::OuterReference);
1004 }
1005 return Some(UnnestInputType::Scalar);
1006 }
1007 }
1008 }
1009 None
1010 }
1011
1012 fn try_unnest_to_table_factor_sql(
1013 &self,
1014 unnest: &Unnest,
1015 ) -> Result<Option<UnnestRelationBuilder>> {
1016 let mut unnest_relation = UnnestRelationBuilder::default();
1017 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1018 return Ok(None);
1019 };
1020
1021 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1022 return Ok(None);
1030 };
1031
1032 let exprs = projection
1033 .expr
1034 .iter()
1035 .map(|e| self.expr_to_sql(e))
1036 .collect::<Result<Vec<_>>>()?;
1037 unnest_relation.array_exprs(exprs);
1038
1039 Ok(Some(unnest_relation))
1040 }
1041
1042 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1043 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1044 }
1045
1046 fn unparse_table_scan_pushdown(
1049 plan: &LogicalPlan,
1050 alias: Option<TableReference>,
1051 already_projected: bool,
1052 ) -> Result<Option<LogicalPlan>> {
1053 match plan {
1054 LogicalPlan::TableScan(table_scan) => {
1055 if !Self::is_scan_with_pushdown(table_scan) {
1056 return Ok(None);
1057 }
1058 let table_schema = table_scan.source.schema();
1059 let mut filter_alias_rewriter =
1060 alias.as_ref().map(|alias_name| TableAliasRewriter {
1061 table_schema: &table_schema,
1062 alias_name: alias_name.clone(),
1063 });
1064
1065 let mut builder = LogicalPlanBuilder::scan(
1066 table_scan.table_name.clone(),
1067 Arc::clone(&table_scan.source),
1068 None,
1069 )?;
1070 if let Some(ref alias) = alias {
1076 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1077 builder = builder.alias(alias.clone())?;
1078 }
1079 }
1080
1081 if !already_projected {
1085 if let Some(project_vec) = &table_scan.projection {
1086 if project_vec.is_empty() {
1087 builder = builder.project(vec![Expr::Literal(
1088 ScalarValue::Int64(Some(1)),
1089 None,
1090 )])?;
1091 } else {
1092 let project_columns = project_vec
1093 .iter()
1094 .cloned()
1095 .map(|i| {
1096 let schema = table_scan.source.schema();
1097 let field = schema.field(i);
1098 if alias.is_some() {
1099 Column::new(alias.clone(), field.name().clone())
1100 } else {
1101 Column::new(
1102 Some(table_scan.table_name.clone()),
1103 field.name().clone(),
1104 )
1105 }
1106 })
1107 .collect::<Vec<_>>();
1108 builder = builder.project(project_columns)?;
1109 };
1110 }
1111 }
1112
1113 let filter_expr: Result<Option<Expr>> = table_scan
1114 .filters
1115 .iter()
1116 .cloned()
1117 .map(|expr| {
1118 if let Some(ref mut rewriter) = filter_alias_rewriter {
1119 expr.rewrite(rewriter).data()
1120 } else {
1121 Ok(expr)
1122 }
1123 })
1124 .reduce(|acc, expr_result| {
1125 acc.and_then(|acc_expr| {
1126 expr_result.map(|expr| acc_expr.and(expr))
1127 })
1128 })
1129 .transpose();
1130
1131 if let Some(filter) = filter_expr? {
1132 builder = builder.filter(filter)?;
1133 }
1134
1135 if let Some(fetch) = table_scan.fetch {
1136 builder = builder.limit(0, Some(fetch))?;
1137 }
1138
1139 if let Some(alias) = alias {
1144 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1145 builder = builder.alias(alias)?;
1146 }
1147 }
1148
1149 Ok(Some(builder.build()?))
1150 }
1151 LogicalPlan::SubqueryAlias(subquery_alias) => {
1152 let ret = Self::unparse_table_scan_pushdown(
1153 &subquery_alias.input,
1154 Some(subquery_alias.alias.clone()),
1155 already_projected,
1156 )?;
1157 if let Some(alias) = alias {
1158 if let Some(plan) = ret {
1159 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1160 return Ok(Some(plan));
1161 }
1162 }
1163 Ok(ret)
1164 }
1165 LogicalPlan::Projection(projection) => {
1168 if let Some(plan) = Self::unparse_table_scan_pushdown(
1169 &projection.input,
1170 alias.clone(),
1171 already_projected,
1172 )? {
1173 let exprs = if alias.is_some() {
1174 let mut alias_rewriter =
1175 alias.as_ref().map(|alias_name| TableAliasRewriter {
1176 table_schema: plan.schema().as_arrow(),
1177 alias_name: alias_name.clone(),
1178 });
1179 projection
1180 .expr
1181 .iter()
1182 .cloned()
1183 .map(|expr| {
1184 if let Some(ref mut rewriter) = alias_rewriter {
1185 expr.rewrite(rewriter).data()
1186 } else {
1187 Ok(expr)
1188 }
1189 })
1190 .collect::<Result<Vec<_>>>()?
1191 } else {
1192 projection.expr.clone()
1193 };
1194 Ok(Some(
1195 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1196 ))
1197 } else {
1198 Ok(None)
1199 }
1200 }
1201 _ => Ok(None),
1202 }
1203 }
1204
1205 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1206 match expr {
1207 Expr::Alias(Alias { expr, name, .. }) => {
1208 let inner = self.expr_to_sql(expr)?;
1209
1210 let col_name = if let Some(rewritten_name) =
1212 self.dialect.col_alias_overrides(name)?
1213 {
1214 rewritten_name.to_string()
1215 } else {
1216 name.to_string()
1217 };
1218
1219 Ok(ast::SelectItem::ExprWithAlias {
1220 expr: inner,
1221 alias: self.new_ident_quoted_if_needs(col_name),
1222 })
1223 }
1224 _ => {
1225 let inner = self.expr_to_sql(expr)?;
1226
1227 Ok(ast::SelectItem::UnnamedExpr(inner))
1228 }
1229 }
1230 }
1231
1232 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1233 Ok(OrderByKind::Expressions(
1234 sort_exprs
1235 .iter()
1236 .map(|sort_expr| self.sort_to_sql(sort_expr))
1237 .collect::<Result<Vec<_>>>()?,
1238 ))
1239 }
1240
1241 fn join_operator_to_sql(
1242 &self,
1243 join_type: JoinType,
1244 constraint: ast::JoinConstraint,
1245 ) -> Result<ast::JoinOperator> {
1246 Ok(match join_type {
1247 JoinType::Inner => match &constraint {
1248 ast::JoinConstraint::On(_)
1249 | ast::JoinConstraint::Using(_)
1250 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1251 ast::JoinConstraint::None => {
1252 ast::JoinOperator::CrossJoin
1255 }
1256 },
1257 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1258 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1259 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1260 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1261 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1262 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1263 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1264 JoinType::LeftMark | JoinType::RightMark => {
1265 unimplemented!("Unparsing of Mark join type")
1266 }
1267 })
1268 }
1269
1270 fn join_using_to_sql(
1274 &self,
1275 join_conditions: &[(Expr, Expr)],
1276 ) -> Option<ast::JoinConstraint> {
1277 let mut object_names = Vec::with_capacity(join_conditions.len());
1278 for (left, right) in join_conditions {
1279 match (left, right) {
1280 (
1281 Expr::Column(Column {
1282 relation: _,
1283 name: left_name,
1284 spans: _,
1285 }),
1286 Expr::Column(Column {
1287 relation: _,
1288 name: right_name,
1289 spans: _,
1290 }),
1291 ) if left_name == right_name => {
1292 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1296 object_names.push(ast::ObjectName::from(vec![ident]));
1297 }
1298 _ => return None,
1301 }
1302 }
1303 Some(ast::JoinConstraint::Using(object_names))
1304 }
1305
1306 fn join_constraint_to_sql(
1308 &self,
1309 constraint: JoinConstraint,
1310 conditions: &[(Expr, Expr)],
1311 filter: Option<&Expr>,
1312 ) -> Result<ast::JoinConstraint> {
1313 match (constraint, conditions, filter) {
1314 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1316 Ok(ast::JoinConstraint::None)
1317 }
1318
1319 (JoinConstraint::Using, conditions, None) => {
1320 match self.join_using_to_sql(conditions) {
1321 Some(using) => Ok(using),
1322 None => self.join_conditions_to_sql_on(conditions, None),
1325 }
1326 }
1327
1328 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1336 self.join_conditions_to_sql_on(conditions, filter)
1337 }
1338 }
1339 }
1340
1341 fn join_conditions_to_sql_on(
1345 &self,
1346 join_conditions: &[(Expr, Expr)],
1347 filter: Option<&Expr>,
1348 ) -> Result<ast::JoinConstraint> {
1349 let mut condition = None;
1350 for (left, right) in join_conditions {
1352 let l = self.expr_to_sql(left)?;
1354 let r = self.expr_to_sql(right)?;
1355 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1356 condition = match condition {
1357 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1358 None => Some(e),
1359 };
1360 }
1361
1362 condition = match (condition, filter) {
1364 (Some(expr), Some(filter)) => {
1365 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1366 }
1367 (Some(expr), None) => Some(expr),
1368 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1369 (None, None) => None,
1370 };
1371
1372 let constraint = match condition {
1373 Some(filter) => ast::JoinConstraint::On(filter),
1374 None => ast::JoinConstraint::None,
1375 };
1376
1377 Ok(constraint)
1378 }
1379
1380 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1381 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1382 }
1383
1384 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1385 let columns = columns
1386 .into_iter()
1387 .map(|ident| TableAliasColumnDef {
1388 name: ident,
1389 data_type: None,
1390 })
1391 .collect();
1392 ast::TableAlias {
1393 name: self.new_ident_quoted_if_needs(alias),
1394 columns,
1395 }
1396 }
1397
1398 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1399 not_impl_err!("Unsupported plan: {plan:?}")
1400 }
1401}
1402
1403impl From<BuilderError> for DataFusionError {
1404 fn from(e: BuilderError) -> Self {
1405 DataFusionError::External(Box::new(e))
1406 }
1407}
1408
1409#[derive(Debug)]
1411enum UnnestInputType {
1412 OuterReference,
1414 Scalar,
1416}