1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let plan = normalize_union_schema(plan)?;
99
100 match plan {
101 LogicalPlan::Projection(_)
102 | LogicalPlan::Filter(_)
103 | LogicalPlan::Window(_)
104 | LogicalPlan::Aggregate(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Join(_)
107 | LogicalPlan::Repartition(_)
108 | LogicalPlan::Union(_)
109 | LogicalPlan::TableScan(_)
110 | LogicalPlan::EmptyRelation(_)
111 | LogicalPlan::Subquery(_)
112 | LogicalPlan::SubqueryAlias(_)
113 | LogicalPlan::Limit(_)
114 | LogicalPlan::Statement(_)
115 | LogicalPlan::Values(_)
116 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118 LogicalPlan::Extension(extension) => {
119 self.extension_to_statement(extension.node.as_ref())
120 }
121 LogicalPlan::Explain(_)
122 | LogicalPlan::Analyze(_)
123 | LogicalPlan::Ddl(_)
124 | LogicalPlan::Copy(_)
125 | LogicalPlan::DescribeTable(_)
126 | LogicalPlan::RecursiveQuery(_)
127 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128 }
129 }
130
131 fn extension_to_statement(
135 &self,
136 node: &dyn UserDefinedLogicalNode,
137 ) -> Result<ast::Statement> {
138 let mut statement = None;
139 for unparser in &self.extension_unparsers {
140 match unparser.unparse_to_statement(node, self)? {
141 UnparseToStatementResult::Modified(stmt) => {
142 statement = Some(stmt);
143 break;
144 }
145 UnparseToStatementResult::Unmodified => {}
146 }
147 }
148 if let Some(statement) = statement {
149 Ok(statement)
150 } else {
151 not_impl_err!("Unsupported extension node: {node:?}")
152 }
153 }
154
155 fn extension_to_sql(
159 &self,
160 node: &dyn UserDefinedLogicalNode,
161 query: &mut Option<&mut QueryBuilder>,
162 select: &mut Option<&mut SelectBuilder>,
163 relation: &mut Option<&mut RelationBuilder>,
164 ) -> Result<()> {
165 for unparser in &self.extension_unparsers {
166 match unparser.unparse(node, self, query, select, relation)? {
167 UnparseWithinStatementResult::Modified => return Ok(()),
168 UnparseWithinStatementResult::Unmodified => {}
169 }
170 }
171 not_impl_err!("Unsupported extension node: {node:?}")
172 }
173
174 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175 let mut query_builder = Some(QueryBuilder::default());
176
177 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179 let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181 Ok(ast::Statement::Query(Box::new(query)))
182 }
183
184 fn select_to_sql_expr(
185 &self,
186 plan: &LogicalPlan,
187 query: &mut Option<QueryBuilder>,
188 ) -> Result<SetExpr> {
189 let mut select_builder = SelectBuilder::default();
190 select_builder.push_from(TableWithJoinsBuilder::default());
191 let mut relation_builder = RelationBuilder::default();
192 self.select_to_sql_recursively(
193 plan,
194 query,
195 &mut select_builder,
196 &mut relation_builder,
197 )?;
198
199 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201 return Ok(*body);
202 }
203
204 if !select_builder.already_projected() {
207 select_builder.projection(vec![ast::SelectItem::Wildcard(
208 ast::WildcardAdditionalOptions::default(),
209 )]);
210 }
211
212 let mut twj = select_builder.pop_from().unwrap();
213 twj.relation(relation_builder);
214 select_builder.push_from(twj);
215
216 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217 }
218
219 fn reconstruct_select_statement(
223 &self,
224 plan: &LogicalPlan,
225 p: &Projection,
226 select: &mut SelectBuilder,
227 ) -> Result<()> {
228 let mut exprs = p.expr.clone();
229
230 if let Some(unnest) = find_unnest_node_within_select(plan) {
232 exprs = exprs
233 .into_iter()
234 .map(|e| unproject_unnest_expr(e, unnest))
235 .collect::<Result<Vec<_>>>()?;
236 };
237
238 match (
239 find_agg_node_within_select(plan, true),
240 find_window_nodes_within_select(plan, None, true),
241 ) {
242 (Some(agg), window) => {
243 let window_option = window.as_deref();
244 let items = exprs
245 .into_iter()
246 .map(|proj_expr| {
247 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248 self.select_item_to_sql(&unproj)
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 select.projection(items);
253 select.group_by(ast::GroupByExpr::Expressions(
254 agg.group_expr
255 .iter()
256 .map(|expr| self.expr_to_sql(expr))
257 .collect::<Result<Vec<_>>>()?,
258 vec![],
259 ));
260 }
261 (None, Some(window)) => {
262 let items = exprs
263 .into_iter()
264 .map(|proj_expr| {
265 let unproj = unproject_window_exprs(proj_expr, &window)?;
266 self.select_item_to_sql(&unproj)
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 select.projection(items);
271 }
272 _ => {
273 let items = exprs
274 .iter()
275 .map(|e| self.select_item_to_sql(e))
276 .collect::<Result<Vec<_>>>()?;
277 select.projection(items);
278 }
279 }
280 Ok(())
281 }
282
283 fn derive(
284 &self,
285 plan: &LogicalPlan,
286 relation: &mut RelationBuilder,
287 alias: Option<ast::TableAlias>,
288 lateral: bool,
289 ) -> Result<()> {
290 let mut derived_builder = DerivedRelationBuilder::default();
291 derived_builder.lateral(lateral).alias(alias).subquery({
292 let inner_statement = self.plan_to_sql(plan)?;
293 if let ast::Statement::Query(inner_query) = inner_statement {
294 inner_query
295 } else {
296 return internal_err!(
297 "Subquery must be a Query, but found {inner_statement:?}"
298 );
299 }
300 });
301 relation.derived(derived_builder);
302
303 Ok(())
304 }
305
306 fn derive_with_dialect_alias(
307 &self,
308 alias: &str,
309 plan: &LogicalPlan,
310 relation: &mut RelationBuilder,
311 lateral: bool,
312 columns: Vec<Ident>,
313 ) -> Result<()> {
314 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
315 self.derive(
316 plan,
317 relation,
318 Some(self.new_table_alias(alias.to_string(), columns)),
319 lateral,
320 )
321 } else {
322 self.derive(plan, relation, None, lateral)
323 }
324 }
325
326 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
327 fn select_to_sql_recursively(
328 &self,
329 plan: &LogicalPlan,
330 query: &mut Option<QueryBuilder>,
331 select: &mut SelectBuilder,
332 relation: &mut RelationBuilder,
333 ) -> Result<()> {
334 match plan {
335 LogicalPlan::TableScan(scan) => {
336 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
337 plan,
338 None,
339 select.already_projected(),
340 )? {
341 return self.select_to_sql_recursively(
342 &unparsed_table_scan,
343 query,
344 select,
345 relation,
346 );
347 }
348 let mut builder = TableRelationBuilder::default();
349 let mut table_parts = vec![];
350 if let Some(catalog_name) = scan.table_name.catalog() {
351 table_parts
352 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
353 }
354 if let Some(schema_name) = scan.table_name.schema() {
355 table_parts
356 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
357 }
358 table_parts.push(
359 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
360 );
361 builder.name(ast::ObjectName::from(table_parts));
362 relation.table(builder);
363
364 Ok(())
365 }
366 LogicalPlan::Projection(p) => {
367 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
368 return self
369 .select_to_sql_recursively(&new_plan, query, select, relation);
370 }
371
372 let unnest_input_type = if p.expr.len() == 1 {
376 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
377 } else {
378 None
379 };
380 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
381 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
382 if let Some(unnest_relation) =
383 self.try_unnest_to_table_factor_sql(unnest)?
384 {
385 relation.unnest(unnest_relation);
386 return self.select_to_sql_recursively(
387 p.input.as_ref(),
388 query,
389 select,
390 relation,
391 );
392 }
393 }
394 }
395
396 let columns = if unnest_input_type.is_some() {
399 p.expr
400 .iter()
401 .map(|e| {
402 self.new_ident_quoted_if_needs(e.schema_name().to_string())
403 })
404 .collect()
405 } else {
406 vec![]
407 };
408 if select.already_projected() {
410 return self.derive_with_dialect_alias(
411 "derived_projection",
412 plan,
413 relation,
414 unnest_input_type
415 .filter(|t| matches!(t, UnnestInputType::OuterReference))
416 .is_some(),
417 columns,
418 );
419 }
420 self.reconstruct_select_statement(plan, p, select)?;
421 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
422 }
423 LogicalPlan::Filter(filter) => {
424 if let Some(agg) =
425 find_agg_node_within_select(plan, select.already_projected())
426 {
427 let unprojected =
428 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
429 let filter_expr = self.expr_to_sql(&unprojected)?;
430 select.having(Some(filter_expr));
431 } else {
432 let filter_expr = self.expr_to_sql(&filter.predicate)?;
433 select.selection(Some(filter_expr));
434 }
435
436 self.select_to_sql_recursively(
437 filter.input.as_ref(),
438 query,
439 select,
440 relation,
441 )
442 }
443 LogicalPlan::Limit(limit) => {
444 if select.already_projected() {
446 return self.derive_with_dialect_alias(
447 "derived_limit",
448 plan,
449 relation,
450 false,
451 vec![],
452 );
453 }
454 if let Some(fetch) = &limit.fetch {
455 let Some(query) = query.as_mut() else {
456 return internal_err!(
457 "Limit operator only valid in a statement context."
458 );
459 };
460 query.limit(Some(self.expr_to_sql(fetch)?));
461 }
462
463 if let Some(skip) = &limit.skip {
464 let Some(query) = query.as_mut() else {
465 return internal_err!(
466 "Offset operator only valid in a statement context."
467 );
468 };
469 query.offset(Some(ast::Offset {
470 rows: ast::OffsetRows::None,
471 value: self.expr_to_sql(skip)?,
472 }));
473 }
474
475 self.select_to_sql_recursively(
476 limit.input.as_ref(),
477 query,
478 select,
479 relation,
480 )
481 }
482 LogicalPlan::Sort(sort) => {
483 if select.already_projected() {
485 return self.derive_with_dialect_alias(
486 "derived_sort",
487 plan,
488 relation,
489 false,
490 vec![],
491 );
492 }
493 let Some(query_ref) = query else {
494 return internal_err!(
495 "Sort operator only valid in a statement context."
496 );
497 };
498
499 if let Some(fetch) = sort.fetch {
500 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
501 fetch.to_string(),
502 false,
503 ))));
504 };
505
506 let agg = find_agg_node_within_select(plan, select.already_projected());
507 let sort_exprs: Vec<SortExpr> = sort
509 .expr
510 .iter()
511 .map(|sort_expr| {
512 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
513 })
514 .collect::<Result<Vec<_>>>()?;
515
516 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
517
518 self.select_to_sql_recursively(
519 sort.input.as_ref(),
520 query,
521 select,
522 relation,
523 )
524 }
525 LogicalPlan::Aggregate(agg) => {
526 if !select.already_projected() {
528 let exprs: Vec<_> = agg
531 .aggr_expr
532 .iter()
533 .chain(agg.group_expr.iter())
534 .map(|expr| self.select_item_to_sql(expr))
535 .collect::<Result<Vec<_>>>()?;
536 select.projection(exprs);
537
538 select.group_by(ast::GroupByExpr::Expressions(
539 agg.group_expr
540 .iter()
541 .map(|expr| self.expr_to_sql(expr))
542 .collect::<Result<Vec<_>>>()?,
543 vec![],
544 ));
545 }
546
547 self.select_to_sql_recursively(
548 agg.input.as_ref(),
549 query,
550 select,
551 relation,
552 )
553 }
554 LogicalPlan::Distinct(distinct) => {
555 if select.already_projected() {
557 return self.derive_with_dialect_alias(
558 "derived_distinct",
559 plan,
560 relation,
561 false,
562 vec![],
563 );
564 }
565
566 if let Distinct::All(input) = distinct {
569 if matches!(input.as_ref(), LogicalPlan::Union(_)) {
570 if let Some(query_mut) = query.as_mut() {
571 query_mut.distinct_union();
572 return self.select_to_sql_recursively(
573 input.as_ref(),
574 query,
575 select,
576 relation,
577 );
578 }
579 }
580 }
581
582 let (select_distinct, input) = match distinct {
583 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
584 Distinct::On(on) => {
585 let exprs = on
586 .on_expr
587 .iter()
588 .map(|e| self.expr_to_sql(e))
589 .collect::<Result<Vec<_>>>()?;
590 let items = on
591 .select_expr
592 .iter()
593 .map(|e| self.select_item_to_sql(e))
594 .collect::<Result<Vec<_>>>()?;
595 if let Some(sort_expr) = &on.sort_expr {
596 if let Some(query_ref) = query {
597 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
598 } else {
599 return internal_err!(
600 "Sort operator only valid in a statement context."
601 );
602 }
603 }
604 select.projection(items);
605 (ast::Distinct::On(exprs), on.input.as_ref())
606 }
607 };
608 select.distinct(Some(select_distinct));
609 self.select_to_sql_recursively(input, query, select, relation)
610 }
611 LogicalPlan::Join(join) => {
612 let mut table_scan_filters = vec![];
613 let (left_plan, right_plan) = match join.join_type {
614 JoinType::RightSemi | JoinType::RightAnti => {
615 (&join.right, &join.left)
616 }
617 _ => (&join.left, &join.right),
618 };
619 let already_projected = select.already_projected();
623
624 let left_plan =
625 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
626 Some((plan, filters)) => {
627 table_scan_filters.extend(filters);
628 Arc::new(plan)
629 }
630 None => Arc::clone(left_plan),
631 };
632
633 self.select_to_sql_recursively(
634 left_plan.as_ref(),
635 query,
636 select,
637 relation,
638 )?;
639
640 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
641 {
642 Some(select.pop_projections())
643 } else {
644 None
645 };
646
647 let right_plan =
648 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
649 Some((plan, filters)) => {
650 table_scan_filters.extend(filters);
651 Arc::new(plan)
652 }
653 None => Arc::clone(right_plan),
654 };
655
656 let mut right_relation = RelationBuilder::default();
657
658 self.select_to_sql_recursively(
659 right_plan.as_ref(),
660 query,
661 select,
662 &mut right_relation,
663 )?;
664
665 let join_filters = if table_scan_filters.is_empty() {
666 join.filter.clone()
667 } else {
668 let Some(combined_filters) =
670 table_scan_filters.into_iter().reduce(|acc, filter| {
671 Expr::BinaryExpr(BinaryExpr {
672 left: Box::new(acc),
673 op: Operator::And,
674 right: Box::new(filter),
675 })
676 })
677 else {
678 return internal_err!("Failed to combine TableScan filters");
679 };
680
681 match &join.filter {
683 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
684 left: Box::new(filter.clone()),
685 op: Operator::And,
686 right: Box::new(combined_filters),
687 })),
688 None => Some(combined_filters),
689 }
690 };
691
692 let join_constraint = self.join_constraint_to_sql(
693 join.join_constraint,
694 &join.on,
695 join_filters.as_ref(),
696 )?;
697
698 self.select_to_sql_recursively(
699 right_plan.as_ref(),
700 query,
701 select,
702 &mut right_relation,
703 )?;
704
705 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
706 {
707 Some(select.pop_projections())
708 } else {
709 None
710 };
711
712 match join.join_type {
713 JoinType::LeftSemi
714 | JoinType::LeftAnti
715 | JoinType::LeftMark
716 | JoinType::RightSemi
717 | JoinType::RightAnti => {
718 let mut query_builder = QueryBuilder::default();
719 let mut from = TableWithJoinsBuilder::default();
720 let mut exists_select: SelectBuilder = SelectBuilder::default();
721 from.relation(right_relation);
722 exists_select.push_from(from);
723 if let Some(filter) = &join.filter {
724 exists_select.selection(Some(self.expr_to_sql(filter)?));
725 }
726 for (left, right) in &join.on {
727 exists_select.selection(Some(
728 self.expr_to_sql(&left.clone().eq(right.clone()))?,
729 ));
730 }
731 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
732 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
733 )]);
734 query_builder.body(Box::new(SetExpr::Select(Box::new(
735 exists_select.build()?,
736 ))));
737
738 let negated = match join.join_type {
739 JoinType::LeftSemi
740 | JoinType::RightSemi
741 | JoinType::LeftMark => false,
742 JoinType::LeftAnti | JoinType::RightAnti => true,
743 _ => unreachable!(),
744 };
745 let exists_expr = ast::Expr::Exists {
746 subquery: Box::new(query_builder.build()?),
747 negated,
748 };
749 if join.join_type == JoinType::LeftMark {
750 let (table_ref, _) = right_plan.schema().qualified_field(0);
751 let column = self
752 .col_to_sql(&Column::new(table_ref.cloned(), "mark"))?;
753 select.replace_mark(&column, &exists_expr);
754 } else {
755 select.selection(Some(exists_expr));
756 }
757 if let Some(projection) = left_projection {
758 select.projection(projection);
759 }
760 }
761 JoinType::Inner
762 | JoinType::Left
763 | JoinType::Right
764 | JoinType::Full => {
765 let Ok(Some(relation)) = right_relation.build() else {
766 return internal_err!("Failed to build right relation");
767 };
768 let ast_join = ast::Join {
769 relation,
770 global: false,
771 join_operator: self
772 .join_operator_to_sql(join.join_type, join_constraint)?,
773 };
774 let mut from = select.pop_from().unwrap();
775 from.push_join(ast_join);
776 select.push_from(from);
777 if !already_projected {
778 let Some(left_projection) = left_projection else {
779 return internal_err!("Left projection is missing");
780 };
781
782 let Some(right_projection) = right_projection else {
783 return internal_err!("Right projection is missing");
784 };
785
786 let projection = left_projection
787 .into_iter()
788 .chain(right_projection.into_iter())
789 .collect();
790 select.projection(projection);
791 }
792 }
793 };
794
795 Ok(())
796 }
797 LogicalPlan::SubqueryAlias(plan_alias) => {
798 let (plan, mut columns) =
799 subquery_alias_inner_query_and_columns(plan_alias);
800 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
801 plan,
802 Some(plan_alias.alias.clone()),
803 select.already_projected(),
804 )?;
805 if !select.already_projected() && unparsed_table_scan.is_none() {
808 select.projection(vec![ast::SelectItem::Wildcard(
809 ast::WildcardAdditionalOptions::default(),
810 )]);
811 }
812 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
813 if !columns.is_empty()
814 && !self.dialect.supports_column_alias_in_table_alias()
815 {
816 let rewritten_plan =
818 match inject_column_aliases_into_subquery(plan, columns) {
819 Ok(p) => p,
820 Err(e) => {
821 return internal_err!(
822 "Failed to transform SubqueryAlias plan: {e}"
823 )
824 }
825 };
826
827 columns = vec![];
828
829 self.select_to_sql_recursively(
830 &rewritten_plan,
831 query,
832 select,
833 relation,
834 )?;
835 } else {
836 self.select_to_sql_recursively(&plan, query, select, relation)?;
837 }
838
839 relation.alias(Some(
840 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
841 ));
842
843 Ok(())
844 }
845 LogicalPlan::Union(union) => {
846 if select.already_projected() {
848 return self.derive_with_dialect_alias(
849 "derived_union",
850 plan,
851 relation,
852 false,
853 vec![],
854 );
855 }
856
857 let input_exprs: Vec<SetExpr> = union
858 .inputs
859 .iter()
860 .map(|input| self.select_to_sql_expr(input, query))
861 .collect::<Result<Vec<_>>>()?;
862
863 if input_exprs.len() < 2 {
864 return internal_err!("UNION operator requires at least 2 inputs");
865 }
866
867 let set_quantifier =
868 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
869 ast::SetQuantifier::None
872 } else {
873 ast::SetQuantifier::All
874 };
875
876 let union_expr = input_exprs
879 .into_iter()
880 .rev()
881 .reduce(|a, b| SetExpr::SetOperation {
882 op: ast::SetOperator::Union,
883 set_quantifier,
884 left: Box::new(b),
885 right: Box::new(a),
886 })
887 .unwrap();
888
889 let Some(query) = query.as_mut() else {
890 return internal_err!(
891 "UNION ALL operator only valid in a statement context"
892 );
893 };
894 query.body(Box::new(union_expr));
895
896 Ok(())
897 }
898 LogicalPlan::Window(window) => {
899 self.select_to_sql_recursively(
901 window.input.as_ref(),
902 query,
903 select,
904 relation,
905 )
906 }
907 LogicalPlan::EmptyRelation(_) => {
908 if !relation.has_relation() {
911 relation.empty();
912 }
913 Ok(())
914 }
915 LogicalPlan::Extension(extension) => {
916 if let Some(query) = query.as_mut() {
917 self.extension_to_sql(
918 extension.node.as_ref(),
919 &mut Some(query),
920 &mut Some(select),
921 &mut Some(relation),
922 )
923 } else {
924 self.extension_to_sql(
925 extension.node.as_ref(),
926 &mut None,
927 &mut Some(select),
928 &mut Some(relation),
929 )
930 }
931 }
932 LogicalPlan::Unnest(unnest) => {
933 if !unnest.struct_type_columns.is_empty() {
934 return internal_err!(
935 "Struct type columns are not currently supported in UNNEST: {:?}",
936 unnest.struct_type_columns
937 );
938 }
939
940 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
948 self.select_to_sql_recursively(&p.input, query, select, relation)
950 } else {
951 internal_err!("Unnest input is not a Projection: {unnest:?}")
952 }
953 }
954 LogicalPlan::Subquery(subquery)
955 if find_unnest_node_until_relation(subquery.subquery.as_ref())
956 .is_some() =>
957 {
958 if self.dialect.unnest_as_table_factor() {
959 self.select_to_sql_recursively(
960 subquery.subquery.as_ref(),
961 query,
962 select,
963 relation,
964 )
965 } else {
966 self.derive_with_dialect_alias(
967 "derived_unnest",
968 subquery.subquery.as_ref(),
969 relation,
970 true,
971 vec![],
972 )
973 }
974 }
975 _ => {
976 not_impl_err!("Unsupported operator: {plan:?}")
977 }
978 }
979 }
980
981 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
991 if let Expr::Alias(Alias { expr, .. }) = expr {
992 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
993 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
994 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
995 return Some(UnnestInputType::OuterReference);
996 }
997 return Some(UnnestInputType::Scalar);
998 }
999 }
1000 }
1001 None
1002 }
1003
1004 fn try_unnest_to_table_factor_sql(
1005 &self,
1006 unnest: &Unnest,
1007 ) -> Result<Option<UnnestRelationBuilder>> {
1008 let mut unnest_relation = UnnestRelationBuilder::default();
1009 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1010 return Ok(None);
1011 };
1012
1013 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1014 return Ok(None);
1022 };
1023
1024 let exprs = projection
1025 .expr
1026 .iter()
1027 .map(|e| self.expr_to_sql(e))
1028 .collect::<Result<Vec<_>>>()?;
1029 unnest_relation.array_exprs(exprs);
1030
1031 Ok(Some(unnest_relation))
1032 }
1033
1034 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1035 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1036 }
1037
1038 fn unparse_table_scan_pushdown(
1041 plan: &LogicalPlan,
1042 alias: Option<TableReference>,
1043 already_projected: bool,
1044 ) -> Result<Option<LogicalPlan>> {
1045 match plan {
1046 LogicalPlan::TableScan(table_scan) => {
1047 if !Self::is_scan_with_pushdown(table_scan) {
1048 return Ok(None);
1049 }
1050 let table_schema = table_scan.source.schema();
1051 let mut filter_alias_rewriter =
1052 alias.as_ref().map(|alias_name| TableAliasRewriter {
1053 table_schema: &table_schema,
1054 alias_name: alias_name.clone(),
1055 });
1056
1057 let mut builder = LogicalPlanBuilder::scan(
1058 table_scan.table_name.clone(),
1059 Arc::clone(&table_scan.source),
1060 None,
1061 )?;
1062 if let Some(ref alias) = alias {
1068 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1069 builder = builder.alias(alias.clone())?;
1070 }
1071 }
1072
1073 if !already_projected {
1077 if let Some(project_vec) = &table_scan.projection {
1078 if project_vec.is_empty() {
1079 builder = builder.project(vec![Expr::Literal(
1080 ScalarValue::Int64(Some(1)),
1081 None,
1082 )])?;
1083 } else {
1084 let project_columns = project_vec
1085 .iter()
1086 .cloned()
1087 .map(|i| {
1088 let schema = table_scan.source.schema();
1089 let field = schema.field(i);
1090 if alias.is_some() {
1091 Column::new(alias.clone(), field.name().clone())
1092 } else {
1093 Column::new(
1094 Some(table_scan.table_name.clone()),
1095 field.name().clone(),
1096 )
1097 }
1098 })
1099 .collect::<Vec<_>>();
1100 builder = builder.project(project_columns)?;
1101 };
1102 }
1103 }
1104
1105 let filter_expr: Result<Option<Expr>> = table_scan
1106 .filters
1107 .iter()
1108 .cloned()
1109 .map(|expr| {
1110 if let Some(ref mut rewriter) = filter_alias_rewriter {
1111 expr.rewrite(rewriter).data()
1112 } else {
1113 Ok(expr)
1114 }
1115 })
1116 .reduce(|acc, expr_result| {
1117 acc.and_then(|acc_expr| {
1118 expr_result.map(|expr| acc_expr.and(expr))
1119 })
1120 })
1121 .transpose();
1122
1123 if let Some(filter) = filter_expr? {
1124 builder = builder.filter(filter)?;
1125 }
1126
1127 if let Some(fetch) = table_scan.fetch {
1128 builder = builder.limit(0, Some(fetch))?;
1129 }
1130
1131 if let Some(alias) = alias {
1136 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1137 builder = builder.alias(alias)?;
1138 }
1139 }
1140
1141 Ok(Some(builder.build()?))
1142 }
1143 LogicalPlan::SubqueryAlias(subquery_alias) => {
1144 let ret = Self::unparse_table_scan_pushdown(
1145 &subquery_alias.input,
1146 Some(subquery_alias.alias.clone()),
1147 already_projected,
1148 )?;
1149 if let Some(alias) = alias {
1150 if let Some(plan) = ret {
1151 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1152 return Ok(Some(plan));
1153 }
1154 }
1155 Ok(ret)
1156 }
1157 LogicalPlan::Projection(projection) => {
1160 if let Some(plan) = Self::unparse_table_scan_pushdown(
1161 &projection.input,
1162 alias.clone(),
1163 already_projected,
1164 )? {
1165 let exprs = if alias.is_some() {
1166 let mut alias_rewriter =
1167 alias.as_ref().map(|alias_name| TableAliasRewriter {
1168 table_schema: plan.schema().as_arrow(),
1169 alias_name: alias_name.clone(),
1170 });
1171 projection
1172 .expr
1173 .iter()
1174 .cloned()
1175 .map(|expr| {
1176 if let Some(ref mut rewriter) = alias_rewriter {
1177 expr.rewrite(rewriter).data()
1178 } else {
1179 Ok(expr)
1180 }
1181 })
1182 .collect::<Result<Vec<_>>>()?
1183 } else {
1184 projection.expr.clone()
1185 };
1186 Ok(Some(
1187 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1188 ))
1189 } else {
1190 Ok(None)
1191 }
1192 }
1193 _ => Ok(None),
1194 }
1195 }
1196
1197 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1198 match expr {
1199 Expr::Alias(Alias { expr, name, .. }) => {
1200 let inner = self.expr_to_sql(expr)?;
1201
1202 Ok(ast::SelectItem::ExprWithAlias {
1203 expr: inner,
1204 alias: self.new_ident_quoted_if_needs(name.to_string()),
1205 })
1206 }
1207 _ => {
1208 let inner = self.expr_to_sql(expr)?;
1209
1210 Ok(ast::SelectItem::UnnamedExpr(inner))
1211 }
1212 }
1213 }
1214
1215 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1216 Ok(OrderByKind::Expressions(
1217 sort_exprs
1218 .iter()
1219 .map(|sort_expr| self.sort_to_sql(sort_expr))
1220 .collect::<Result<Vec<_>>>()?,
1221 ))
1222 }
1223
1224 fn join_operator_to_sql(
1225 &self,
1226 join_type: JoinType,
1227 constraint: ast::JoinConstraint,
1228 ) -> Result<ast::JoinOperator> {
1229 Ok(match join_type {
1230 JoinType::Inner => match &constraint {
1231 ast::JoinConstraint::On(_)
1232 | ast::JoinConstraint::Using(_)
1233 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1234 ast::JoinConstraint::None => {
1235 ast::JoinOperator::CrossJoin
1238 }
1239 },
1240 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1241 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1242 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1243 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1244 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1245 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1246 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1247 JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"),
1248 })
1249 }
1250
1251 fn join_using_to_sql(
1255 &self,
1256 join_conditions: &[(Expr, Expr)],
1257 ) -> Option<ast::JoinConstraint> {
1258 let mut object_names = Vec::with_capacity(join_conditions.len());
1259 for (left, right) in join_conditions {
1260 match (left, right) {
1261 (
1262 Expr::Column(Column {
1263 relation: _,
1264 name: left_name,
1265 spans: _,
1266 }),
1267 Expr::Column(Column {
1268 relation: _,
1269 name: right_name,
1270 spans: _,
1271 }),
1272 ) if left_name == right_name => {
1273 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1277 object_names.push(ast::ObjectName::from(vec![ident]));
1278 }
1279 _ => return None,
1282 }
1283 }
1284 Some(ast::JoinConstraint::Using(object_names))
1285 }
1286
1287 fn join_constraint_to_sql(
1289 &self,
1290 constraint: JoinConstraint,
1291 conditions: &[(Expr, Expr)],
1292 filter: Option<&Expr>,
1293 ) -> Result<ast::JoinConstraint> {
1294 match (constraint, conditions, filter) {
1295 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1297 Ok(ast::JoinConstraint::None)
1298 }
1299
1300 (JoinConstraint::Using, conditions, None) => {
1301 match self.join_using_to_sql(conditions) {
1302 Some(using) => Ok(using),
1303 None => self.join_conditions_to_sql_on(conditions, None),
1306 }
1307 }
1308
1309 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1317 self.join_conditions_to_sql_on(conditions, filter)
1318 }
1319 }
1320 }
1321
1322 fn join_conditions_to_sql_on(
1326 &self,
1327 join_conditions: &[(Expr, Expr)],
1328 filter: Option<&Expr>,
1329 ) -> Result<ast::JoinConstraint> {
1330 let mut condition = None;
1331 for (left, right) in join_conditions {
1333 let l = self.expr_to_sql(left)?;
1335 let r = self.expr_to_sql(right)?;
1336 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1337 condition = match condition {
1338 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1339 None => Some(e),
1340 };
1341 }
1342
1343 condition = match (condition, filter) {
1345 (Some(expr), Some(filter)) => {
1346 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1347 }
1348 (Some(expr), None) => Some(expr),
1349 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1350 (None, None) => None,
1351 };
1352
1353 let constraint = match condition {
1354 Some(filter) => ast::JoinConstraint::On(filter),
1355 None => ast::JoinConstraint::None,
1356 };
1357
1358 Ok(constraint)
1359 }
1360
1361 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1362 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1363 }
1364
1365 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1366 let columns = columns
1367 .into_iter()
1368 .map(|ident| TableAliasColumnDef {
1369 name: ident,
1370 data_type: None,
1371 })
1372 .collect();
1373 ast::TableAlias {
1374 name: self.new_ident_quoted_if_needs(alias),
1375 columns,
1376 }
1377 }
1378
1379 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1380 not_impl_err!("Unsupported plan: {plan:?}")
1381 }
1382}
1383
1384impl From<BuilderError> for DataFusionError {
1385 fn from(e: BuilderError) -> Self {
1386 DataFusionError::External(Box::new(e))
1387 }
1388}
1389
1390#[derive(Debug)]
1392enum UnnestInputType {
1393 OuterReference,
1395 Scalar,
1397}