1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, SetExpr, TableAliasColumnDef};
53use std::sync::Arc;
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let plan = normalize_union_schema(plan)?;
99
100 match plan {
101 LogicalPlan::Projection(_)
102 | LogicalPlan::Filter(_)
103 | LogicalPlan::Window(_)
104 | LogicalPlan::Aggregate(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Join(_)
107 | LogicalPlan::Repartition(_)
108 | LogicalPlan::Union(_)
109 | LogicalPlan::TableScan(_)
110 | LogicalPlan::EmptyRelation(_)
111 | LogicalPlan::Subquery(_)
112 | LogicalPlan::SubqueryAlias(_)
113 | LogicalPlan::Limit(_)
114 | LogicalPlan::Statement(_)
115 | LogicalPlan::Values(_)
116 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118 LogicalPlan::Extension(extension) => {
119 self.extension_to_statement(extension.node.as_ref())
120 }
121 LogicalPlan::Explain(_)
122 | LogicalPlan::Analyze(_)
123 | LogicalPlan::Ddl(_)
124 | LogicalPlan::Copy(_)
125 | LogicalPlan::DescribeTable(_)
126 | LogicalPlan::RecursiveQuery(_)
127 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128 }
129 }
130
131 fn extension_to_statement(
135 &self,
136 node: &dyn UserDefinedLogicalNode,
137 ) -> Result<ast::Statement> {
138 let mut statement = None;
139 for unparser in &self.extension_unparsers {
140 match unparser.unparse_to_statement(node, self)? {
141 UnparseToStatementResult::Modified(stmt) => {
142 statement = Some(stmt);
143 break;
144 }
145 UnparseToStatementResult::Unmodified => {}
146 }
147 }
148 if let Some(statement) = statement {
149 Ok(statement)
150 } else {
151 not_impl_err!("Unsupported extension node: {node:?}")
152 }
153 }
154
155 fn extension_to_sql(
159 &self,
160 node: &dyn UserDefinedLogicalNode,
161 query: &mut Option<&mut QueryBuilder>,
162 select: &mut Option<&mut SelectBuilder>,
163 relation: &mut Option<&mut RelationBuilder>,
164 ) -> Result<()> {
165 for unparser in &self.extension_unparsers {
166 match unparser.unparse(node, self, query, select, relation)? {
167 UnparseWithinStatementResult::Modified => return Ok(()),
168 UnparseWithinStatementResult::Unmodified => {}
169 }
170 }
171 not_impl_err!("Unsupported extension node: {node:?}")
172 }
173
174 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175 let mut query_builder = Some(QueryBuilder::default());
176
177 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179 let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181 Ok(ast::Statement::Query(Box::new(query)))
182 }
183
184 fn select_to_sql_expr(
185 &self,
186 plan: &LogicalPlan,
187 query: &mut Option<QueryBuilder>,
188 ) -> Result<SetExpr> {
189 let mut select_builder = SelectBuilder::default();
190 select_builder.push_from(TableWithJoinsBuilder::default());
191 let mut relation_builder = RelationBuilder::default();
192 self.select_to_sql_recursively(
193 plan,
194 query,
195 &mut select_builder,
196 &mut relation_builder,
197 )?;
198
199 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201 return Ok(*body);
202 }
203
204 if !select_builder.already_projected() {
207 select_builder.projection(vec![ast::SelectItem::Wildcard(
208 ast::WildcardAdditionalOptions::default(),
209 )]);
210 }
211
212 let mut twj = select_builder.pop_from().unwrap();
213 twj.relation(relation_builder);
214 select_builder.push_from(twj);
215
216 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217 }
218
219 fn reconstruct_select_statement(
223 &self,
224 plan: &LogicalPlan,
225 p: &Projection,
226 select: &mut SelectBuilder,
227 ) -> Result<()> {
228 let mut exprs = p.expr.clone();
229
230 if let Some(unnest) = find_unnest_node_within_select(plan) {
232 exprs = exprs
233 .into_iter()
234 .map(|e| unproject_unnest_expr(e, unnest))
235 .collect::<Result<Vec<_>>>()?;
236 };
237
238 match (
239 find_agg_node_within_select(plan, true),
240 find_window_nodes_within_select(plan, None, true),
241 ) {
242 (Some(agg), window) => {
243 let window_option = window.as_deref();
244 let items = exprs
245 .into_iter()
246 .map(|proj_expr| {
247 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248 self.select_item_to_sql(&unproj)
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 select.projection(items);
253 select.group_by(ast::GroupByExpr::Expressions(
254 agg.group_expr
255 .iter()
256 .map(|expr| self.expr_to_sql(expr))
257 .collect::<Result<Vec<_>>>()?,
258 vec![],
259 ));
260 }
261 (None, Some(window)) => {
262 let items = exprs
263 .into_iter()
264 .map(|proj_expr| {
265 let unproj = unproject_window_exprs(proj_expr, &window)?;
266 self.select_item_to_sql(&unproj)
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 select.projection(items);
271 }
272 _ => {
273 let items = exprs
274 .iter()
275 .map(|e| self.select_item_to_sql(e))
276 .collect::<Result<Vec<_>>>()?;
277 select.projection(items);
278 }
279 }
280 Ok(())
281 }
282
283 fn derive(
284 &self,
285 plan: &LogicalPlan,
286 relation: &mut RelationBuilder,
287 alias: Option<ast::TableAlias>,
288 lateral: bool,
289 ) -> Result<()> {
290 let mut derived_builder = DerivedRelationBuilder::default();
291 derived_builder.lateral(lateral).alias(alias).subquery({
292 let inner_statement = self.plan_to_sql(plan)?;
293 if let ast::Statement::Query(inner_query) = inner_statement {
294 inner_query
295 } else {
296 return internal_err!(
297 "Subquery must be a Query, but found {inner_statement:?}"
298 );
299 }
300 });
301 relation.derived(derived_builder);
302
303 Ok(())
304 }
305
306 fn derive_with_dialect_alias(
307 &self,
308 alias: &str,
309 plan: &LogicalPlan,
310 relation: &mut RelationBuilder,
311 lateral: bool,
312 ) -> Result<()> {
313 if self.dialect.requires_derived_table_alias() {
314 self.derive(
315 plan,
316 relation,
317 Some(self.new_table_alias(alias.to_string(), vec![])),
318 lateral,
319 )
320 } else {
321 self.derive(plan, relation, None, lateral)
322 }
323 }
324
325 fn select_to_sql_recursively(
326 &self,
327 plan: &LogicalPlan,
328 query: &mut Option<QueryBuilder>,
329 select: &mut SelectBuilder,
330 relation: &mut RelationBuilder,
331 ) -> Result<()> {
332 match plan {
333 LogicalPlan::TableScan(scan) => {
334 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
335 plan,
336 None,
337 select.already_projected(),
338 )? {
339 return self.select_to_sql_recursively(
340 &unparsed_table_scan,
341 query,
342 select,
343 relation,
344 );
345 }
346 let mut builder = TableRelationBuilder::default();
347 let mut table_parts = vec![];
348 if let Some(catalog_name) = scan.table_name.catalog() {
349 table_parts
350 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
351 }
352 if let Some(schema_name) = scan.table_name.schema() {
353 table_parts
354 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
355 }
356 table_parts.push(
357 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
358 );
359 builder.name(ast::ObjectName(table_parts));
360 relation.table(builder);
361
362 Ok(())
363 }
364 LogicalPlan::Projection(p) => {
365 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
366 return self
367 .select_to_sql_recursively(&new_plan, query, select, relation);
368 }
369
370 let unnest_input_type = if p.expr.len() == 1 {
374 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
375 } else {
376 None
377 };
378 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
379 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
380 return self
381 .unnest_to_table_factor_sql(unnest, query, select, relation);
382 }
383 }
384
385 if select.already_projected() {
387 return self.derive_with_dialect_alias(
388 "derived_projection",
389 plan,
390 relation,
391 unnest_input_type
392 .filter(|t| matches!(t, UnnestInputType::OuterReference))
393 .is_some(),
394 );
395 }
396 self.reconstruct_select_statement(plan, p, select)?;
397 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
398 }
399 LogicalPlan::Filter(filter) => {
400 if let Some(agg) =
401 find_agg_node_within_select(plan, select.already_projected())
402 {
403 let unprojected =
404 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
405 let filter_expr = self.expr_to_sql(&unprojected)?;
406 select.having(Some(filter_expr));
407 } else {
408 let filter_expr = self.expr_to_sql(&filter.predicate)?;
409 select.selection(Some(filter_expr));
410 }
411
412 self.select_to_sql_recursively(
413 filter.input.as_ref(),
414 query,
415 select,
416 relation,
417 )
418 }
419 LogicalPlan::Limit(limit) => {
420 if select.already_projected() {
422 return self.derive_with_dialect_alias(
423 "derived_limit",
424 plan,
425 relation,
426 false,
427 );
428 }
429 if let Some(fetch) = &limit.fetch {
430 let Some(query) = query.as_mut() else {
431 return internal_err!(
432 "Limit operator only valid in a statement context."
433 );
434 };
435 query.limit(Some(self.expr_to_sql(fetch)?));
436 }
437
438 if let Some(skip) = &limit.skip {
439 let Some(query) = query.as_mut() else {
440 return internal_err!(
441 "Offset operator only valid in a statement context."
442 );
443 };
444 query.offset(Some(ast::Offset {
445 rows: ast::OffsetRows::None,
446 value: self.expr_to_sql(skip)?,
447 }));
448 }
449
450 self.select_to_sql_recursively(
451 limit.input.as_ref(),
452 query,
453 select,
454 relation,
455 )
456 }
457 LogicalPlan::Sort(sort) => {
458 if select.already_projected() {
460 return self.derive_with_dialect_alias(
461 "derived_sort",
462 plan,
463 relation,
464 false,
465 );
466 }
467 let Some(query_ref) = query else {
468 return internal_err!(
469 "Sort operator only valid in a statement context."
470 );
471 };
472
473 if let Some(fetch) = sort.fetch {
474 query_ref.limit(Some(ast::Expr::Value(ast::Value::Number(
475 fetch.to_string(),
476 false,
477 ))));
478 };
479
480 let agg = find_agg_node_within_select(plan, select.already_projected());
481 let sort_exprs: Vec<SortExpr> = sort
483 .expr
484 .iter()
485 .map(|sort_expr| {
486 unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
487 })
488 .collect::<Result<Vec<_>>>()?;
489
490 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
491
492 self.select_to_sql_recursively(
493 sort.input.as_ref(),
494 query,
495 select,
496 relation,
497 )
498 }
499 LogicalPlan::Aggregate(agg) => {
500 if !select.already_projected() {
502 let exprs: Vec<_> = agg
505 .aggr_expr
506 .iter()
507 .chain(agg.group_expr.iter())
508 .map(|expr| self.select_item_to_sql(expr))
509 .collect::<Result<Vec<_>>>()?;
510 select.projection(exprs);
511
512 select.group_by(ast::GroupByExpr::Expressions(
513 agg.group_expr
514 .iter()
515 .map(|expr| self.expr_to_sql(expr))
516 .collect::<Result<Vec<_>>>()?,
517 vec![],
518 ));
519 }
520
521 self.select_to_sql_recursively(
522 agg.input.as_ref(),
523 query,
524 select,
525 relation,
526 )
527 }
528 LogicalPlan::Distinct(distinct) => {
529 if select.already_projected() {
531 return self.derive_with_dialect_alias(
532 "derived_distinct",
533 plan,
534 relation,
535 false,
536 );
537 }
538 let (select_distinct, input) = match distinct {
539 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
540 Distinct::On(on) => {
541 let exprs = on
542 .on_expr
543 .iter()
544 .map(|e| self.expr_to_sql(e))
545 .collect::<Result<Vec<_>>>()?;
546 let items = on
547 .select_expr
548 .iter()
549 .map(|e| self.select_item_to_sql(e))
550 .collect::<Result<Vec<_>>>()?;
551 if let Some(sort_expr) = &on.sort_expr {
552 if let Some(query_ref) = query {
553 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
554 } else {
555 return internal_err!(
556 "Sort operator only valid in a statement context."
557 );
558 }
559 }
560 select.projection(items);
561 (ast::Distinct::On(exprs), on.input.as_ref())
562 }
563 };
564 select.distinct(Some(select_distinct));
565 self.select_to_sql_recursively(input, query, select, relation)
566 }
567 LogicalPlan::Join(join) => {
568 let mut table_scan_filters = vec![];
569
570 let left_plan =
571 match try_transform_to_simple_table_scan_with_filters(&join.left)? {
572 Some((plan, filters)) => {
573 table_scan_filters.extend(filters);
574 Arc::new(plan)
575 }
576 None => Arc::clone(&join.left),
577 };
578
579 self.select_to_sql_recursively(
580 left_plan.as_ref(),
581 query,
582 select,
583 relation,
584 )?;
585
586 let right_plan =
587 match try_transform_to_simple_table_scan_with_filters(&join.right)? {
588 Some((plan, filters)) => {
589 table_scan_filters.extend(filters);
590 Arc::new(plan)
591 }
592 None => Arc::clone(&join.right),
593 };
594
595 let mut right_relation = RelationBuilder::default();
596
597 self.select_to_sql_recursively(
598 right_plan.as_ref(),
599 query,
600 select,
601 &mut right_relation,
602 )?;
603
604 let join_filters = if table_scan_filters.is_empty() {
605 join.filter.clone()
606 } else {
607 let Some(combined_filters) =
609 table_scan_filters.into_iter().reduce(|acc, filter| {
610 Expr::BinaryExpr(BinaryExpr {
611 left: Box::new(acc),
612 op: Operator::And,
613 right: Box::new(filter),
614 })
615 })
616 else {
617 return internal_err!("Failed to combine TableScan filters");
618 };
619
620 match &join.filter {
622 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
623 left: Box::new(filter.clone()),
624 op: Operator::And,
625 right: Box::new(combined_filters),
626 })),
627 None => Some(combined_filters),
628 }
629 };
630
631 let join_constraint = self.join_constraint_to_sql(
632 join.join_constraint,
633 &join.on,
634 join_filters.as_ref(),
635 )?;
636
637 self.select_to_sql_recursively(
638 right_plan.as_ref(),
639 query,
640 select,
641 &mut right_relation,
642 )?;
643
644 let Ok(Some(relation)) = right_relation.build() else {
645 return internal_err!("Failed to build right relation");
646 };
647
648 let ast_join = ast::Join {
649 relation,
650 global: false,
651 join_operator: self
652 .join_operator_to_sql(join.join_type, join_constraint)?,
653 };
654 let mut from = select.pop_from().unwrap();
655 from.push_join(ast_join);
656 select.push_from(from);
657
658 Ok(())
659 }
660 LogicalPlan::SubqueryAlias(plan_alias) => {
661 let (plan, mut columns) =
662 subquery_alias_inner_query_and_columns(plan_alias);
663 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
664 plan,
665 Some(plan_alias.alias.clone()),
666 select.already_projected(),
667 )?;
668 if !select.already_projected() && unparsed_table_scan.is_none() {
671 select.projection(vec![ast::SelectItem::Wildcard(
672 ast::WildcardAdditionalOptions::default(),
673 )]);
674 }
675 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
676 if !columns.is_empty()
677 && !self.dialect.supports_column_alias_in_table_alias()
678 {
679 let rewritten_plan =
681 match inject_column_aliases_into_subquery(plan, columns) {
682 Ok(p) => p,
683 Err(e) => {
684 return internal_err!(
685 "Failed to transform SubqueryAlias plan: {e}"
686 )
687 }
688 };
689
690 columns = vec![];
691
692 self.select_to_sql_recursively(
693 &rewritten_plan,
694 query,
695 select,
696 relation,
697 )?;
698 } else {
699 self.select_to_sql_recursively(&plan, query, select, relation)?;
700 }
701
702 relation.alias(Some(
703 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
704 ));
705
706 Ok(())
707 }
708 LogicalPlan::Union(union) => {
709 if select.already_projected() {
711 return self.derive_with_dialect_alias(
712 "derived_union",
713 plan,
714 relation,
715 false,
716 );
717 }
718
719 let input_exprs: Vec<SetExpr> = union
720 .inputs
721 .iter()
722 .map(|input| self.select_to_sql_expr(input, query))
723 .collect::<Result<Vec<_>>>()?;
724
725 if input_exprs.len() < 2 {
726 return internal_err!("UNION operator requires at least 2 inputs");
727 }
728
729 let union_expr = input_exprs
732 .into_iter()
733 .rev()
734 .reduce(|a, b| SetExpr::SetOperation {
735 op: ast::SetOperator::Union,
736 set_quantifier: ast::SetQuantifier::All,
737 left: Box::new(b),
738 right: Box::new(a),
739 })
740 .unwrap();
741
742 let Some(query) = query.as_mut() else {
743 return internal_err!(
744 "UNION ALL operator only valid in a statement context"
745 );
746 };
747 query.body(Box::new(union_expr));
748
749 Ok(())
750 }
751 LogicalPlan::Window(window) => {
752 self.select_to_sql_recursively(
754 window.input.as_ref(),
755 query,
756 select,
757 relation,
758 )
759 }
760 LogicalPlan::EmptyRelation(_) => {
761 if !relation.has_relation() {
764 relation.empty();
765 }
766 Ok(())
767 }
768 LogicalPlan::Extension(extension) => {
769 if let Some(query) = query.as_mut() {
770 self.extension_to_sql(
771 extension.node.as_ref(),
772 &mut Some(query),
773 &mut Some(select),
774 &mut Some(relation),
775 )
776 } else {
777 self.extension_to_sql(
778 extension.node.as_ref(),
779 &mut None,
780 &mut Some(select),
781 &mut Some(relation),
782 )
783 }
784 }
785 LogicalPlan::Unnest(unnest) => {
786 if !unnest.struct_type_columns.is_empty() {
787 return internal_err!(
788 "Struct type columns are not currently supported in UNNEST: {:?}",
789 unnest.struct_type_columns
790 );
791 }
792
793 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
801 self.select_to_sql_recursively(&p.input, query, select, relation)
803 } else {
804 internal_err!("Unnest input is not a Projection: {unnest:?}")
805 }
806 }
807 LogicalPlan::Subquery(subquery)
808 if find_unnest_node_until_relation(subquery.subquery.as_ref())
809 .is_some() =>
810 {
811 if self.dialect.unnest_as_table_factor() {
812 self.select_to_sql_recursively(
813 subquery.subquery.as_ref(),
814 query,
815 select,
816 relation,
817 )
818 } else {
819 self.derive_with_dialect_alias(
820 "derived_unnest",
821 subquery.subquery.as_ref(),
822 relation,
823 true,
824 )
825 }
826 }
827 _ => {
828 not_impl_err!("Unsupported operator: {plan:?}")
829 }
830 }
831 }
832
833 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
843 if let Expr::Alias(Alias { expr, .. }) = expr {
844 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
845 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
846 if prefix.starts_with(&format!("({}(", OUTER_REFERENCE_COLUMN_PREFIX))
847 {
848 return Some(UnnestInputType::OuterReference);
849 }
850 return Some(UnnestInputType::Scalar);
851 }
852 }
853 }
854 None
855 }
856
857 fn unnest_to_table_factor_sql(
858 &self,
859 unnest: &Unnest,
860 query: &mut Option<QueryBuilder>,
861 select: &mut SelectBuilder,
862 relation: &mut RelationBuilder,
863 ) -> Result<()> {
864 let mut unnest_relation = UnnestRelationBuilder::default();
865 let LogicalPlan::Projection(p) = unnest.input.as_ref() else {
866 return internal_err!("Unnest input is not a Projection: {unnest:?}");
867 };
868 let exprs = p
869 .expr
870 .iter()
871 .map(|e| self.expr_to_sql(e))
872 .collect::<Result<Vec<_>>>()?;
873 unnest_relation.array_exprs(exprs);
874 relation.unnest(unnest_relation);
875 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
876 }
877
878 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
879 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
880 }
881
882 fn unparse_table_scan_pushdown(
885 plan: &LogicalPlan,
886 alias: Option<TableReference>,
887 already_projected: bool,
888 ) -> Result<Option<LogicalPlan>> {
889 match plan {
890 LogicalPlan::TableScan(table_scan) => {
891 if !Self::is_scan_with_pushdown(table_scan) {
892 return Ok(None);
893 }
894 let table_schema = table_scan.source.schema();
895 let mut filter_alias_rewriter =
896 alias.as_ref().map(|alias_name| TableAliasRewriter {
897 table_schema: &table_schema,
898 alias_name: alias_name.clone(),
899 });
900
901 let mut builder = LogicalPlanBuilder::scan(
902 table_scan.table_name.clone(),
903 Arc::clone(&table_scan.source),
904 None,
905 )?;
906 if let Some(ref alias) = alias {
912 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
913 builder = builder.alias(alias.clone())?;
914 }
915 }
916
917 if !already_projected {
921 if let Some(project_vec) = &table_scan.projection {
922 if project_vec.is_empty() {
923 builder = builder.project(vec![Expr::Literal(
924 ScalarValue::Int64(Some(1)),
925 )])?;
926 } else {
927 let project_columns = project_vec
928 .iter()
929 .cloned()
930 .map(|i| {
931 let schema = table_scan.source.schema();
932 let field = schema.field(i);
933 if alias.is_some() {
934 Column::new(alias.clone(), field.name().clone())
935 } else {
936 Column::new(
937 Some(table_scan.table_name.clone()),
938 field.name().clone(),
939 )
940 }
941 })
942 .collect::<Vec<_>>();
943 builder = builder.project(project_columns)?;
944 };
945 }
946 }
947
948 let filter_expr: Result<Option<Expr>> = table_scan
949 .filters
950 .iter()
951 .cloned()
952 .map(|expr| {
953 if let Some(ref mut rewriter) = filter_alias_rewriter {
954 expr.rewrite(rewriter).data()
955 } else {
956 Ok(expr)
957 }
958 })
959 .reduce(|acc, expr_result| {
960 acc.and_then(|acc_expr| {
961 expr_result.map(|expr| acc_expr.and(expr))
962 })
963 })
964 .transpose();
965
966 if let Some(filter) = filter_expr? {
967 builder = builder.filter(filter)?;
968 }
969
970 if let Some(fetch) = table_scan.fetch {
971 builder = builder.limit(0, Some(fetch))?;
972 }
973
974 if let Some(alias) = alias {
979 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
980 builder = builder.alias(alias)?;
981 }
982 }
983
984 Ok(Some(builder.build()?))
985 }
986 LogicalPlan::SubqueryAlias(subquery_alias) => {
987 Self::unparse_table_scan_pushdown(
988 &subquery_alias.input,
989 Some(subquery_alias.alias.clone()),
990 already_projected,
991 )
992 }
993 LogicalPlan::Projection(projection) => {
996 if let Some(plan) = Self::unparse_table_scan_pushdown(
997 &projection.input,
998 alias.clone(),
999 already_projected,
1000 )? {
1001 let exprs = if alias.is_some() {
1002 let mut alias_rewriter =
1003 alias.as_ref().map(|alias_name| TableAliasRewriter {
1004 table_schema: plan.schema().as_arrow(),
1005 alias_name: alias_name.clone(),
1006 });
1007 projection
1008 .expr
1009 .iter()
1010 .cloned()
1011 .map(|expr| {
1012 if let Some(ref mut rewriter) = alias_rewriter {
1013 expr.rewrite(rewriter).data()
1014 } else {
1015 Ok(expr)
1016 }
1017 })
1018 .collect::<Result<Vec<_>>>()?
1019 } else {
1020 projection.expr.clone()
1021 };
1022 Ok(Some(
1023 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1024 ))
1025 } else {
1026 Ok(None)
1027 }
1028 }
1029 _ => Ok(None),
1030 }
1031 }
1032
1033 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1034 match expr {
1035 Expr::Alias(Alias { expr, name, .. }) => {
1036 let inner = self.expr_to_sql(expr)?;
1037
1038 Ok(ast::SelectItem::ExprWithAlias {
1039 expr: inner,
1040 alias: self.new_ident_quoted_if_needs(name.to_string()),
1041 })
1042 }
1043 _ => {
1044 let inner = self.expr_to_sql(expr)?;
1045
1046 Ok(ast::SelectItem::UnnamedExpr(inner))
1047 }
1048 }
1049 }
1050
1051 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<Vec<ast::OrderByExpr>> {
1052 sort_exprs
1053 .iter()
1054 .map(|sort_expr| self.sort_to_sql(sort_expr))
1055 .collect::<Result<Vec<_>>>()
1056 }
1057
1058 fn join_operator_to_sql(
1059 &self,
1060 join_type: JoinType,
1061 constraint: ast::JoinConstraint,
1062 ) -> Result<ast::JoinOperator> {
1063 Ok(match join_type {
1064 JoinType::Inner => match &constraint {
1065 ast::JoinConstraint::On(_)
1066 | ast::JoinConstraint::Using(_)
1067 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1068 ast::JoinConstraint::None => {
1069 ast::JoinOperator::CrossJoin
1072 }
1073 },
1074 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1075 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1076 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1077 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1078 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1079 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1080 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1081 JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"),
1082 })
1083 }
1084
1085 fn join_using_to_sql(
1089 &self,
1090 join_conditions: &[(Expr, Expr)],
1091 ) -> Option<ast::JoinConstraint> {
1092 let mut object_names = Vec::with_capacity(join_conditions.len());
1093 for (left, right) in join_conditions {
1094 match (left, right) {
1095 (
1096 Expr::Column(Column {
1097 relation: _,
1098 name: left_name,
1099 spans: _,
1100 }),
1101 Expr::Column(Column {
1102 relation: _,
1103 name: right_name,
1104 spans: _,
1105 }),
1106 ) if left_name == right_name => {
1107 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1111 object_names.push(ast::ObjectName(vec![ident]));
1112 }
1113 _ => return None,
1116 }
1117 }
1118 Some(ast::JoinConstraint::Using(object_names))
1119 }
1120
1121 fn join_constraint_to_sql(
1123 &self,
1124 constraint: JoinConstraint,
1125 conditions: &[(Expr, Expr)],
1126 filter: Option<&Expr>,
1127 ) -> Result<ast::JoinConstraint> {
1128 match (constraint, conditions, filter) {
1129 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1131 Ok(ast::JoinConstraint::None)
1132 }
1133
1134 (JoinConstraint::Using, conditions, None) => {
1135 match self.join_using_to_sql(conditions) {
1136 Some(using) => Ok(using),
1137 None => self.join_conditions_to_sql_on(conditions, None),
1140 }
1141 }
1142
1143 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1151 self.join_conditions_to_sql_on(conditions, filter)
1152 }
1153 }
1154 }
1155
1156 fn join_conditions_to_sql_on(
1160 &self,
1161 join_conditions: &[(Expr, Expr)],
1162 filter: Option<&Expr>,
1163 ) -> Result<ast::JoinConstraint> {
1164 let mut condition = None;
1165 for (left, right) in join_conditions {
1167 let l = self.expr_to_sql(left)?;
1169 let r = self.expr_to_sql(right)?;
1170 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1171 condition = match condition {
1172 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1173 None => Some(e),
1174 };
1175 }
1176
1177 condition = match (condition, filter) {
1179 (Some(expr), Some(filter)) => {
1180 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1181 }
1182 (Some(expr), None) => Some(expr),
1183 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1184 (None, None) => None,
1185 };
1186
1187 let constraint = match condition {
1188 Some(filter) => ast::JoinConstraint::On(filter),
1189 None => ast::JoinConstraint::None,
1190 };
1191
1192 Ok(constraint)
1193 }
1194
1195 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1196 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1197 }
1198
1199 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1200 let columns = columns
1201 .into_iter()
1202 .map(|ident| TableAliasColumnDef {
1203 name: ident,
1204 data_type: None,
1205 })
1206 .collect();
1207 ast::TableAlias {
1208 name: self.new_ident_quoted_if_needs(alias),
1209 columns,
1210 }
1211 }
1212
1213 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1214 not_impl_err!("Unsupported plan: {plan:?}")
1215 }
1216}
1217
1218impl From<BuilderError> for DataFusionError {
1219 fn from(e: BuilderError) -> Self {
1220 DataFusionError::External(Box::new(e))
1221 }
1222}
1223
1224#[derive(Debug)]
1226enum UnnestInputType {
1227 OuterReference,
1229 Scalar,
1231}