1use crate::{
41 Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct,
42 DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit,
43 LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
44 Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
45 Values, Window, dml::CopyTo,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery, SetComparison};
50use datafusion_common::tree_node::{
51 Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52 TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{Result, internal_err};
55
56impl TreeNode for LogicalPlan {
57 fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58 &'n self,
59 f: F,
60 ) -> Result<TreeNodeRecursion> {
61 self.inputs().apply_ref_elements(f)
62 }
63
64 fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73 self,
74 f: F,
75 ) -> Result<Transformed<Self>> {
76 Ok(match self {
77 LogicalPlan::Projection(Projection {
78 expr,
79 input,
80 schema,
81 }) => input.map_elements(f)?.update_data(|input| {
82 LogicalPlan::Projection(Projection {
83 expr,
84 input,
85 schema,
86 })
87 }),
88 LogicalPlan::Filter(Filter { predicate, input }) => input
89 .map_elements(f)?
90 .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91 LogicalPlan::Repartition(Repartition {
92 input,
93 partitioning_scheme,
94 }) => input.map_elements(f)?.update_data(|input| {
95 LogicalPlan::Repartition(Repartition {
96 input,
97 partitioning_scheme,
98 })
99 }),
100 LogicalPlan::Window(Window {
101 input,
102 window_expr,
103 schema,
104 }) => input.map_elements(f)?.update_data(|input| {
105 LogicalPlan::Window(Window {
106 input,
107 window_expr,
108 schema,
109 })
110 }),
111 LogicalPlan::Aggregate(Aggregate {
112 input,
113 group_expr,
114 aggr_expr,
115 schema,
116 }) => input.map_elements(f)?.update_data(|input| {
117 LogicalPlan::Aggregate(Aggregate {
118 input,
119 group_expr,
120 aggr_expr,
121 schema,
122 })
123 }),
124 LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125 .map_elements(f)?
126 .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127 LogicalPlan::Join(Join {
128 left,
129 right,
130 on,
131 filter,
132 join_type,
133 join_constraint,
134 schema,
135 null_equality,
136 null_aware,
137 }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
138 LogicalPlan::Join(Join {
139 left,
140 right,
141 on,
142 filter,
143 join_type,
144 join_constraint,
145 schema,
146 null_equality,
147 null_aware,
148 })
149 }),
150 LogicalPlan::Limit(Limit { skip, fetch, input }) => input
151 .map_elements(f)?
152 .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
153 LogicalPlan::Subquery(Subquery {
154 subquery,
155 outer_ref_columns,
156 spans,
157 }) => subquery.map_elements(f)?.update_data(|subquery| {
158 LogicalPlan::Subquery(Subquery {
159 subquery,
160 outer_ref_columns,
161 spans,
162 })
163 }),
164 LogicalPlan::SubqueryAlias(SubqueryAlias {
165 input,
166 alias,
167 schema,
168 }) => input.map_elements(f)?.update_data(|input| {
169 LogicalPlan::SubqueryAlias(SubqueryAlias {
170 input,
171 alias,
172 schema,
173 })
174 }),
175 LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
176 .update_data(LogicalPlan::Extension),
177 LogicalPlan::Union(Union { inputs, schema }) => inputs
178 .map_elements(f)?
179 .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
180 LogicalPlan::Distinct(distinct) => match distinct {
181 Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
182 Distinct::On(DistinctOn {
183 on_expr,
184 select_expr,
185 sort_expr,
186 input,
187 schema,
188 }) => input.map_elements(f)?.update_data(|input| {
189 Distinct::On(DistinctOn {
190 on_expr,
191 select_expr,
192 sort_expr,
193 input,
194 schema,
195 })
196 }),
197 }
198 .update_data(LogicalPlan::Distinct),
199 LogicalPlan::Explain(Explain {
200 verbose,
201 explain_format: format,
202 plan,
203 stringified_plans,
204 schema,
205 logical_optimization_succeeded,
206 }) => plan.map_elements(f)?.update_data(|plan| {
207 LogicalPlan::Explain(Explain {
208 verbose,
209 explain_format: format,
210 plan,
211 stringified_plans,
212 schema,
213 logical_optimization_succeeded,
214 })
215 }),
216 LogicalPlan::Analyze(Analyze {
217 verbose,
218 input,
219 schema,
220 }) => input.map_elements(f)?.update_data(|input| {
221 LogicalPlan::Analyze(Analyze {
222 verbose,
223 input,
224 schema,
225 })
226 }),
227 LogicalPlan::Dml(DmlStatement {
228 table_name,
229 target,
230 op,
231 input,
232 output_schema,
233 }) => input.map_elements(f)?.update_data(|input| {
234 LogicalPlan::Dml(DmlStatement {
235 table_name,
236 target,
237 op,
238 input,
239 output_schema,
240 })
241 }),
242 LogicalPlan::Copy(CopyTo {
243 input,
244 output_url,
245 partition_by,
246 file_type,
247 options,
248 output_schema,
249 }) => input.map_elements(f)?.update_data(|input| {
250 LogicalPlan::Copy(CopyTo {
251 input,
252 output_url,
253 partition_by,
254 file_type,
255 options,
256 output_schema,
257 })
258 }),
259 LogicalPlan::Ddl(ddl) => {
260 match ddl {
261 DdlStatement::CreateMemoryTable(CreateMemoryTable {
262 name,
263 constraints,
264 input,
265 if_not_exists,
266 or_replace,
267 column_defaults,
268 temporary,
269 }) => input.map_elements(f)?.update_data(|input| {
270 DdlStatement::CreateMemoryTable(CreateMemoryTable {
271 name,
272 constraints,
273 input,
274 if_not_exists,
275 or_replace,
276 column_defaults,
277 temporary,
278 })
279 }),
280 DdlStatement::CreateView(CreateView {
281 name,
282 input,
283 or_replace,
284 definition,
285 temporary,
286 }) => input.map_elements(f)?.update_data(|input| {
287 DdlStatement::CreateView(CreateView {
288 name,
289 input,
290 or_replace,
291 definition,
292 temporary,
293 })
294 }),
295 DdlStatement::CreateExternalTable(_)
297 | DdlStatement::CreateCatalogSchema(_)
298 | DdlStatement::CreateCatalog(_)
299 | DdlStatement::CreateIndex(_)
300 | DdlStatement::DropTable(_)
301 | DdlStatement::DropView(_)
302 | DdlStatement::DropCatalogSchema(_)
303 | DdlStatement::CreateFunction(_)
304 | DdlStatement::DropFunction(_) => Transformed::no(ddl),
305 }
306 .update_data(LogicalPlan::Ddl)
307 }
308 LogicalPlan::Unnest(Unnest {
309 input,
310 exec_columns: input_columns,
311 list_type_columns,
312 struct_type_columns,
313 dependency_indices,
314 schema,
315 options,
316 }) => input.map_elements(f)?.update_data(|input| {
317 LogicalPlan::Unnest(Unnest {
318 input,
319 exec_columns: input_columns,
320 list_type_columns,
321 struct_type_columns,
322 dependency_indices,
323 schema,
324 options,
325 })
326 }),
327 LogicalPlan::RecursiveQuery(RecursiveQuery {
328 name,
329 static_term,
330 recursive_term,
331 is_distinct,
332 }) => (static_term, recursive_term).map_elements(f)?.update_data(
333 |(static_term, recursive_term)| {
334 LogicalPlan::RecursiveQuery(RecursiveQuery {
335 name,
336 static_term,
337 recursive_term,
338 is_distinct,
339 })
340 },
341 ),
342 LogicalPlan::Statement(stmt) => match stmt {
343 Statement::Prepare(p) => p
344 .input
345 .map_elements(f)?
346 .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
347 _ => Transformed::no(stmt),
348 }
349 .update_data(LogicalPlan::Statement),
350 LogicalPlan::TableScan { .. }
352 | LogicalPlan::EmptyRelation { .. }
353 | LogicalPlan::Values { .. }
354 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
355 })
356 }
357}
358
359fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
365 extension: Extension,
366 f: F,
367) -> Result<Transformed<Extension>> {
368 let Extension { node } = extension;
369
370 node.inputs()
371 .into_iter()
372 .cloned()
373 .map_until_stop_and_collect(f)?
374 .map_data(|new_inputs| {
375 let exprs = node.expressions();
376 Ok(Extension {
377 node: node.with_exprs_and_inputs(exprs, new_inputs)?,
378 })
379 })
380}
381
382macro_rules! handle_transform_recursion {
385 ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
386 $F_DOWN?
387 .transform_children(|n| {
388 n.map_subqueries($F_CHILD)?
389 .transform_sibling(|n| n.map_children($F_CHILD))
390 })?
391 .transform_parent($F_UP)
392 }};
393}
394
395impl LogicalPlan {
396 pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
403 &self,
404 mut f: F,
405 ) -> Result<TreeNodeRecursion> {
406 match self {
407 LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
408 LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
409 LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
410 LogicalPlan::Repartition(Repartition {
411 partitioning_scheme,
412 ..
413 }) => match partitioning_scheme {
414 Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
415 expr.apply_elements(f)
416 }
417 Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
418 },
419 LogicalPlan::Window(Window { window_expr, .. }) => {
420 window_expr.apply_elements(f)
421 }
422 LogicalPlan::Aggregate(Aggregate {
423 group_expr,
424 aggr_expr,
425 ..
426 }) => (group_expr, aggr_expr).apply_ref_elements(f),
427 LogicalPlan::Join(Join { on, filter, .. }) => {
431 (on, filter).apply_ref_elements(f)
432 }
433 LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
434 LogicalPlan::Extension(extension) => {
435 extension.node.expressions().apply_elements(f)
438 }
439 LogicalPlan::TableScan(TableScan { filters, .. }) => {
440 filters.apply_elements(f)
441 }
442 LogicalPlan::Unnest(unnest) => {
443 let exprs = unnest
444 .exec_columns
445 .iter()
446 .cloned()
447 .map(Expr::Column)
448 .collect::<Vec<_>>();
449 exprs.apply_elements(f)
450 }
451 LogicalPlan::Distinct(Distinct::On(DistinctOn {
452 on_expr,
453 select_expr,
454 sort_expr,
455 ..
456 })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
457 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
458 (skip, fetch).apply_ref_elements(f)
459 }
460 LogicalPlan::Statement(stmt) => match stmt {
461 Statement::Execute(Execute { parameters, .. }) => {
462 parameters.apply_elements(f)
463 }
464 _ => Ok(TreeNodeRecursion::Continue),
465 },
466 LogicalPlan::EmptyRelation(_)
468 | LogicalPlan::RecursiveQuery(_)
469 | LogicalPlan::Subquery(_)
470 | LogicalPlan::SubqueryAlias(_)
471 | LogicalPlan::Analyze(_)
472 | LogicalPlan::Explain(_)
473 | LogicalPlan::Union(_)
474 | LogicalPlan::Distinct(Distinct::All(_))
475 | LogicalPlan::Dml(_)
476 | LogicalPlan::Ddl(_)
477 | LogicalPlan::Copy(_)
478 | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
479 }
480 }
481
482 pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
490 self,
491 mut f: F,
492 ) -> Result<Transformed<Self>> {
493 Ok(match self {
494 LogicalPlan::Projection(Projection {
495 expr,
496 input,
497 schema,
498 }) => expr.map_elements(f)?.update_data(|expr| {
499 LogicalPlan::Projection(Projection {
500 expr,
501 input,
502 schema,
503 })
504 }),
505 LogicalPlan::Values(Values { schema, values }) => values
506 .map_elements(f)?
507 .update_data(|values| LogicalPlan::Values(Values { schema, values })),
508 LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
509 .update_data(|predicate| {
510 LogicalPlan::Filter(Filter { predicate, input })
511 }),
512 LogicalPlan::Repartition(Repartition {
513 input,
514 partitioning_scheme,
515 }) => match partitioning_scheme {
516 Partitioning::Hash(expr, usize) => expr
517 .map_elements(f)?
518 .update_data(|expr| Partitioning::Hash(expr, usize)),
519 Partitioning::DistributeBy(expr) => expr
520 .map_elements(f)?
521 .update_data(Partitioning::DistributeBy),
522 Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
523 }
524 .update_data(|partitioning_scheme| {
525 LogicalPlan::Repartition(Repartition {
526 input,
527 partitioning_scheme,
528 })
529 }),
530 LogicalPlan::Window(Window {
531 input,
532 window_expr,
533 schema,
534 }) => window_expr.map_elements(f)?.update_data(|window_expr| {
535 LogicalPlan::Window(Window {
536 input,
537 window_expr,
538 schema,
539 })
540 }),
541 LogicalPlan::Aggregate(Aggregate {
542 input,
543 group_expr,
544 aggr_expr,
545 schema,
546 }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
547 |(group_expr, aggr_expr)| {
548 LogicalPlan::Aggregate(Aggregate {
549 input,
550 group_expr,
551 aggr_expr,
552 schema,
553 })
554 },
555 ),
556
557 LogicalPlan::Join(Join {
561 left,
562 right,
563 on,
564 filter,
565 join_type,
566 join_constraint,
567 schema,
568 null_equality,
569 null_aware,
570 }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
571 LogicalPlan::Join(Join {
572 left,
573 right,
574 on,
575 filter,
576 join_type,
577 join_constraint,
578 schema,
579 null_equality,
580 null_aware,
581 })
582 }),
583 LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
584 .map_elements(f)?
585 .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
586 LogicalPlan::Extension(Extension { node }) => {
587 let exprs = node.expressions().map_elements(f)?;
590 let plan = LogicalPlan::Extension(Extension {
591 node: UserDefinedLogicalNode::with_exprs_and_inputs(
592 node.as_ref(),
593 exprs.data,
594 node.inputs().into_iter().cloned().collect::<Vec<_>>(),
595 )?,
596 });
597 Transformed::new(plan, exprs.transformed, exprs.tnr)
598 }
599 LogicalPlan::TableScan(TableScan {
600 table_name,
601 source,
602 projection,
603 projected_schema,
604 filters,
605 fetch,
606 }) => filters.map_elements(f)?.update_data(|filters| {
607 LogicalPlan::TableScan(TableScan {
608 table_name,
609 source,
610 projection,
611 projected_schema,
612 filters,
613 fetch,
614 })
615 }),
616 LogicalPlan::Distinct(Distinct::On(DistinctOn {
617 on_expr,
618 select_expr,
619 sort_expr,
620 input,
621 schema,
622 })) => (on_expr, select_expr, sort_expr)
623 .map_elements(f)?
624 .update_data(|(on_expr, select_expr, sort_expr)| {
625 LogicalPlan::Distinct(Distinct::On(DistinctOn {
626 on_expr,
627 select_expr,
628 sort_expr,
629 input,
630 schema,
631 }))
632 }),
633 LogicalPlan::Limit(Limit { skip, fetch, input }) => {
634 (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
635 LogicalPlan::Limit(Limit { skip, fetch, input })
636 })
637 }
638 LogicalPlan::Statement(stmt) => match stmt {
639 Statement::Execute(e) => {
640 e.parameters.map_elements(f)?.update_data(|parameters| {
641 Statement::Execute(Execute { parameters, ..e })
642 })
643 }
644 _ => Transformed::no(stmt),
645 }
646 .update_data(LogicalPlan::Statement),
647 LogicalPlan::EmptyRelation(_)
649 | LogicalPlan::Unnest(_)
650 | LogicalPlan::RecursiveQuery(_)
651 | LogicalPlan::Subquery(_)
652 | LogicalPlan::SubqueryAlias(_)
653 | LogicalPlan::Analyze(_)
654 | LogicalPlan::Explain(_)
655 | LogicalPlan::Union(_)
656 | LogicalPlan::Distinct(Distinct::All(_))
657 | LogicalPlan::Dml(_)
658 | LogicalPlan::Ddl(_)
659 | LogicalPlan::Copy(_)
660 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
661 })
662 }
663
664 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
667 pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
668 &self,
669 visitor: &mut V,
670 ) -> Result<TreeNodeRecursion> {
671 visitor
672 .f_down(self)?
673 .visit_children(|| {
674 self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
675 .visit_sibling(|| {
676 self.apply_children(|c| c.visit_with_subqueries(visitor))
677 })
678 })?
679 .visit_parent(|| visitor.f_up(self))
680 }
681
682 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
686 pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
687 self,
688 rewriter: &mut R,
689 ) -> Result<Transformed<Self>> {
690 handle_transform_recursion!(
691 rewriter.f_down(self),
692 |c| c.rewrite_with_subqueries(rewriter),
693 |n| rewriter.f_up(n)
694 )
695 }
696
697 pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
701 &self,
702 mut f: F,
703 ) -> Result<TreeNodeRecursion> {
704 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
705 fn apply_with_subqueries_impl<
706 F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
707 >(
708 node: &LogicalPlan,
709 f: &mut F,
710 ) -> Result<TreeNodeRecursion> {
711 f(node)?.visit_children(|| {
712 node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
713 .visit_sibling(|| {
714 node.apply_children(|c| apply_with_subqueries_impl(c, f))
715 })
716 })
717 }
718
719 apply_with_subqueries_impl(self, &mut f)
720 }
721
722 pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
726 self,
727 f: F,
728 ) -> Result<Transformed<Self>> {
729 self.transform_up_with_subqueries(f)
730 }
731
732 pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
736 self,
737 mut f: F,
738 ) -> Result<Transformed<Self>> {
739 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
740 fn transform_down_with_subqueries_impl<
741 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
742 >(
743 node: LogicalPlan,
744 f: &mut F,
745 ) -> Result<Transformed<LogicalPlan>> {
746 f(node)?.transform_children(|n| {
747 n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
748 .transform_sibling(|n| {
749 n.map_children(|c| transform_down_with_subqueries_impl(c, f))
750 })
751 })
752 }
753
754 transform_down_with_subqueries_impl(self, &mut f)
755 }
756
757 pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
761 self,
762 mut f: F,
763 ) -> Result<Transformed<Self>> {
764 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
765 fn transform_up_with_subqueries_impl<
766 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
767 >(
768 node: LogicalPlan,
769 f: &mut F,
770 ) -> Result<Transformed<LogicalPlan>> {
771 node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
772 .transform_sibling(|n| {
773 n.map_children(|c| transform_up_with_subqueries_impl(c, f))
774 })?
775 .transform_parent(f)
776 }
777
778 transform_up_with_subqueries_impl(self, &mut f)
779 }
780
781 pub fn transform_down_up_with_subqueries<
785 FD: FnMut(Self) -> Result<Transformed<Self>>,
786 FU: FnMut(Self) -> Result<Transformed<Self>>,
787 >(
788 self,
789 mut f_down: FD,
790 mut f_up: FU,
791 ) -> Result<Transformed<Self>> {
792 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
793 fn transform_down_up_with_subqueries_impl<
794 FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
795 FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
796 >(
797 node: LogicalPlan,
798 f_down: &mut FD,
799 f_up: &mut FU,
800 ) -> Result<Transformed<LogicalPlan>> {
801 handle_transform_recursion!(
802 f_down(node),
803 |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
804 f_up
805 )
806 }
807
808 transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
809 }
810
811 pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
815 &self,
816 mut f: F,
817 ) -> Result<TreeNodeRecursion> {
818 self.apply_expressions(|expr| {
819 expr.apply(|expr| match expr {
820 Expr::Exists(Exists { subquery, .. })
821 | Expr::InSubquery(InSubquery { subquery, .. })
822 | Expr::SetComparison(SetComparison { subquery, .. })
823 | Expr::ScalarSubquery(subquery) => {
824 f(&LogicalPlan::Subquery(subquery.clone()))
828 }
829 _ => Ok(TreeNodeRecursion::Continue),
830 })
831 })
832 }
833
834 pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
839 self,
840 mut f: F,
841 ) -> Result<Transformed<Self>> {
842 self.map_expressions(|expr| {
843 expr.transform_down(|expr| match expr {
844 Expr::Exists(Exists { subquery, negated }) => {
845 f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
846 LogicalPlan::Subquery(subquery) => {
847 Ok(Expr::Exists(Exists { subquery, negated }))
848 }
849 _ => internal_err!("Transformation should return Subquery"),
850 })
851 }
852 Expr::InSubquery(InSubquery {
853 expr,
854 subquery,
855 negated,
856 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
857 LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
858 expr,
859 subquery,
860 negated,
861 })),
862 _ => internal_err!("Transformation should return Subquery"),
863 }),
864 Expr::SetComparison(SetComparison {
865 expr,
866 subquery,
867 op,
868 quantifier,
869 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
870 LogicalPlan::Subquery(subquery) => {
871 Ok(Expr::SetComparison(SetComparison {
872 expr,
873 subquery,
874 op,
875 quantifier,
876 }))
877 }
878 _ => internal_err!("Transformation should return Subquery"),
879 }),
880 Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
881 .map_data(|s| match s {
882 LogicalPlan::Subquery(subquery) => {
883 Ok(Expr::ScalarSubquery(subquery))
884 }
885 _ => internal_err!("Transformation should return Subquery"),
886 }),
887 _ => Ok(Transformed::no(expr)),
888 })
889 })
890 }
891}