1use crate::{
41 dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42 Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43 Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44 Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45 UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51 Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52 TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57 fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58 &'n self,
59 f: F,
60 ) -> Result<TreeNodeRecursion> {
61 self.inputs().apply_ref_elements(f)
62 }
63
64 fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73 self,
74 f: F,
75 ) -> Result<Transformed<Self>> {
76 Ok(match self {
77 LogicalPlan::Projection(Projection {
78 expr,
79 input,
80 schema,
81 }) => input.map_elements(f)?.update_data(|input| {
82 LogicalPlan::Projection(Projection {
83 expr,
84 input,
85 schema,
86 })
87 }),
88 LogicalPlan::Filter(Filter {
89 predicate,
90 input,
91 having,
92 }) => input.map_elements(f)?.update_data(|input| {
93 LogicalPlan::Filter(Filter {
94 predicate,
95 input,
96 having,
97 })
98 }),
99 LogicalPlan::Repartition(Repartition {
100 input,
101 partitioning_scheme,
102 }) => input.map_elements(f)?.update_data(|input| {
103 LogicalPlan::Repartition(Repartition {
104 input,
105 partitioning_scheme,
106 })
107 }),
108 LogicalPlan::Window(Window {
109 input,
110 window_expr,
111 schema,
112 }) => input.map_elements(f)?.update_data(|input| {
113 LogicalPlan::Window(Window {
114 input,
115 window_expr,
116 schema,
117 })
118 }),
119 LogicalPlan::Aggregate(Aggregate {
120 input,
121 group_expr,
122 aggr_expr,
123 schema,
124 }) => input.map_elements(f)?.update_data(|input| {
125 LogicalPlan::Aggregate(Aggregate {
126 input,
127 group_expr,
128 aggr_expr,
129 schema,
130 })
131 }),
132 LogicalPlan::Sort(Sort { expr, input, fetch }) => input
133 .map_elements(f)?
134 .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
135 LogicalPlan::Join(Join {
136 left,
137 right,
138 on,
139 filter,
140 join_type,
141 join_constraint,
142 schema,
143 null_equals_null,
144 }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
145 LogicalPlan::Join(Join {
146 left,
147 right,
148 on,
149 filter,
150 join_type,
151 join_constraint,
152 schema,
153 null_equals_null,
154 })
155 }),
156 LogicalPlan::Limit(Limit { skip, fetch, input }) => input
157 .map_elements(f)?
158 .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
159 LogicalPlan::Subquery(Subquery {
160 subquery,
161 outer_ref_columns,
162 spans,
163 }) => subquery.map_elements(f)?.update_data(|subquery| {
164 LogicalPlan::Subquery(Subquery {
165 subquery,
166 outer_ref_columns,
167 spans,
168 })
169 }),
170 LogicalPlan::SubqueryAlias(SubqueryAlias {
171 input,
172 alias,
173 schema,
174 }) => input.map_elements(f)?.update_data(|input| {
175 LogicalPlan::SubqueryAlias(SubqueryAlias {
176 input,
177 alias,
178 schema,
179 })
180 }),
181 LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
182 .update_data(LogicalPlan::Extension),
183 LogicalPlan::Union(Union { inputs, schema }) => inputs
184 .map_elements(f)?
185 .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
186 LogicalPlan::Distinct(distinct) => match distinct {
187 Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
188 Distinct::On(DistinctOn {
189 on_expr,
190 select_expr,
191 sort_expr,
192 input,
193 schema,
194 }) => input.map_elements(f)?.update_data(|input| {
195 Distinct::On(DistinctOn {
196 on_expr,
197 select_expr,
198 sort_expr,
199 input,
200 schema,
201 })
202 }),
203 }
204 .update_data(LogicalPlan::Distinct),
205 LogicalPlan::Explain(Explain {
206 verbose,
207 explain_format: format,
208 plan,
209 stringified_plans,
210 schema,
211 logical_optimization_succeeded,
212 }) => plan.map_elements(f)?.update_data(|plan| {
213 LogicalPlan::Explain(Explain {
214 verbose,
215 explain_format: format,
216 plan,
217 stringified_plans,
218 schema,
219 logical_optimization_succeeded,
220 })
221 }),
222 LogicalPlan::Analyze(Analyze {
223 verbose,
224 input,
225 schema,
226 }) => input.map_elements(f)?.update_data(|input| {
227 LogicalPlan::Analyze(Analyze {
228 verbose,
229 input,
230 schema,
231 })
232 }),
233 LogicalPlan::Dml(DmlStatement {
234 table_name,
235 target,
236 op,
237 input,
238 output_schema,
239 }) => input.map_elements(f)?.update_data(|input| {
240 LogicalPlan::Dml(DmlStatement {
241 table_name,
242 target,
243 op,
244 input,
245 output_schema,
246 })
247 }),
248 LogicalPlan::Copy(CopyTo {
249 input,
250 output_url,
251 partition_by,
252 file_type,
253 options,
254 }) => input.map_elements(f)?.update_data(|input| {
255 LogicalPlan::Copy(CopyTo {
256 input,
257 output_url,
258 partition_by,
259 file_type,
260 options,
261 })
262 }),
263 LogicalPlan::Ddl(ddl) => {
264 match ddl {
265 DdlStatement::CreateMemoryTable(CreateMemoryTable {
266 name,
267 constraints,
268 input,
269 if_not_exists,
270 or_replace,
271 column_defaults,
272 temporary,
273 }) => input.map_elements(f)?.update_data(|input| {
274 DdlStatement::CreateMemoryTable(CreateMemoryTable {
275 name,
276 constraints,
277 input,
278 if_not_exists,
279 or_replace,
280 column_defaults,
281 temporary,
282 })
283 }),
284 DdlStatement::CreateView(CreateView {
285 name,
286 input,
287 or_replace,
288 definition,
289 temporary,
290 }) => input.map_elements(f)?.update_data(|input| {
291 DdlStatement::CreateView(CreateView {
292 name,
293 input,
294 or_replace,
295 definition,
296 temporary,
297 })
298 }),
299 DdlStatement::CreateExternalTable(_)
301 | DdlStatement::CreateCatalogSchema(_)
302 | DdlStatement::CreateCatalog(_)
303 | DdlStatement::CreateIndex(_)
304 | DdlStatement::DropTable(_)
305 | DdlStatement::DropView(_)
306 | DdlStatement::DropCatalogSchema(_)
307 | DdlStatement::CreateFunction(_)
308 | DdlStatement::DropFunction(_) => Transformed::no(ddl),
309 }
310 .update_data(LogicalPlan::Ddl)
311 }
312 LogicalPlan::Unnest(Unnest {
313 input,
314 exec_columns: input_columns,
315 list_type_columns,
316 struct_type_columns,
317 dependency_indices,
318 schema,
319 options,
320 }) => input.map_elements(f)?.update_data(|input| {
321 LogicalPlan::Unnest(Unnest {
322 input,
323 exec_columns: input_columns,
324 dependency_indices,
325 list_type_columns,
326 struct_type_columns,
327 schema,
328 options,
329 })
330 }),
331 LogicalPlan::RecursiveQuery(RecursiveQuery {
332 name,
333 static_term,
334 recursive_term,
335 is_distinct,
336 }) => (static_term, recursive_term).map_elements(f)?.update_data(
337 |(static_term, recursive_term)| {
338 LogicalPlan::RecursiveQuery(RecursiveQuery {
339 name,
340 static_term,
341 recursive_term,
342 is_distinct,
343 })
344 },
345 ),
346 LogicalPlan::Statement(stmt) => match stmt {
347 Statement::Prepare(p) => p
348 .input
349 .map_elements(f)?
350 .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
351 _ => Transformed::no(stmt),
352 }
353 .update_data(LogicalPlan::Statement),
354 LogicalPlan::TableScan { .. }
356 | LogicalPlan::EmptyRelation { .. }
357 | LogicalPlan::Values { .. }
358 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
359 })
360 }
361}
362
363fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
369 extension: Extension,
370 f: F,
371) -> Result<Transformed<Extension>> {
372 let Extension { node } = extension;
373
374 node.inputs()
375 .into_iter()
376 .cloned()
377 .map_until_stop_and_collect(f)?
378 .map_data(|new_inputs| {
379 let exprs = node.expressions();
380 Ok(Extension {
381 node: node.with_exprs_and_inputs(exprs, new_inputs)?,
382 })
383 })
384}
385
386macro_rules! handle_transform_recursion {
389 ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
390 $F_DOWN?
391 .transform_children(|n| {
392 n.map_subqueries($F_CHILD)?
393 .transform_sibling(|n| n.map_children($F_CHILD))
394 })?
395 .transform_parent($F_UP)
396 }};
397}
398
399impl LogicalPlan {
400 pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
407 &self,
408 mut f: F,
409 ) -> Result<TreeNodeRecursion> {
410 match self {
411 LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
412 LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
413 LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
414 LogicalPlan::Repartition(Repartition {
415 partitioning_scheme,
416 ..
417 }) => match partitioning_scheme {
418 Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
419 expr.apply_elements(f)
420 }
421 Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
422 },
423 LogicalPlan::Window(Window { window_expr, .. }) => {
424 window_expr.apply_elements(f)
425 }
426 LogicalPlan::Aggregate(Aggregate {
427 group_expr,
428 aggr_expr,
429 ..
430 }) => (group_expr, aggr_expr).apply_ref_elements(f),
431 LogicalPlan::Join(Join { on, filter, .. }) => {
435 (on, filter).apply_ref_elements(f)
436 }
437 LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
438 LogicalPlan::Extension(extension) => {
439 extension.node.expressions().apply_elements(f)
442 }
443 LogicalPlan::TableScan(TableScan { filters, .. }) => {
444 filters.apply_elements(f)
445 }
446 LogicalPlan::Unnest(unnest) => {
447 let columns = unnest.exec_columns.clone();
448
449 let exprs = columns
450 .iter()
451 .map(|c| Expr::Column(c.clone()))
452 .collect::<Vec<_>>();
453 exprs.apply_elements(f)
454 }
455 LogicalPlan::Distinct(Distinct::On(DistinctOn {
456 on_expr,
457 select_expr,
458 sort_expr,
459 ..
460 })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
461 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
462 (skip, fetch).apply_ref_elements(f)
463 }
464 LogicalPlan::Statement(stmt) => match stmt {
465 Statement::Execute(Execute { parameters, .. }) => {
466 parameters.apply_elements(f)
467 }
468 _ => Ok(TreeNodeRecursion::Continue),
469 },
470 LogicalPlan::EmptyRelation(_)
472 | LogicalPlan::RecursiveQuery(_)
473 | LogicalPlan::Subquery(_)
474 | LogicalPlan::SubqueryAlias(_)
475 | LogicalPlan::Analyze(_)
476 | LogicalPlan::Explain(_)
477 | LogicalPlan::Union(_)
478 | LogicalPlan::Distinct(Distinct::All(_))
479 | LogicalPlan::Dml(_)
480 | LogicalPlan::Ddl(_)
481 | LogicalPlan::Copy(_)
482 | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
483 }
484 }
485
486 pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
494 self,
495 mut f: F,
496 ) -> Result<Transformed<Self>> {
497 Ok(match self {
498 LogicalPlan::Projection(Projection {
499 expr,
500 input,
501 schema,
502 }) => expr.map_elements(f)?.update_data(|expr| {
503 LogicalPlan::Projection(Projection {
504 expr,
505 input,
506 schema,
507 })
508 }),
509 LogicalPlan::Values(Values { schema, values }) => values
510 .map_elements(f)?
511 .update_data(|values| LogicalPlan::Values(Values { schema, values })),
512 LogicalPlan::Filter(Filter {
513 predicate,
514 input,
515 having,
516 }) => f(predicate)?.update_data(|predicate| {
517 LogicalPlan::Filter(Filter {
518 predicate,
519 input,
520 having,
521 })
522 }),
523 LogicalPlan::Repartition(Repartition {
524 input,
525 partitioning_scheme,
526 }) => match partitioning_scheme {
527 Partitioning::Hash(expr, usize) => expr
528 .map_elements(f)?
529 .update_data(|expr| Partitioning::Hash(expr, usize)),
530 Partitioning::DistributeBy(expr) => expr
531 .map_elements(f)?
532 .update_data(Partitioning::DistributeBy),
533 Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
534 }
535 .update_data(|partitioning_scheme| {
536 LogicalPlan::Repartition(Repartition {
537 input,
538 partitioning_scheme,
539 })
540 }),
541 LogicalPlan::Window(Window {
542 input,
543 window_expr,
544 schema,
545 }) => window_expr.map_elements(f)?.update_data(|window_expr| {
546 LogicalPlan::Window(Window {
547 input,
548 window_expr,
549 schema,
550 })
551 }),
552 LogicalPlan::Aggregate(Aggregate {
553 input,
554 group_expr,
555 aggr_expr,
556 schema,
557 }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
558 |(group_expr, aggr_expr)| {
559 LogicalPlan::Aggregate(Aggregate {
560 input,
561 group_expr,
562 aggr_expr,
563 schema,
564 })
565 },
566 ),
567
568 LogicalPlan::Join(Join {
572 left,
573 right,
574 on,
575 filter,
576 join_type,
577 join_constraint,
578 schema,
579 null_equals_null,
580 }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
581 LogicalPlan::Join(Join {
582 left,
583 right,
584 on,
585 filter,
586 join_type,
587 join_constraint,
588 schema,
589 null_equals_null,
590 })
591 }),
592 LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
593 .map_elements(f)?
594 .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
595 LogicalPlan::Extension(Extension { node }) => {
596 let exprs = node.expressions().map_elements(f)?;
599 let plan = LogicalPlan::Extension(Extension {
600 node: UserDefinedLogicalNode::with_exprs_and_inputs(
601 node.as_ref(),
602 exprs.data,
603 node.inputs().into_iter().cloned().collect::<Vec<_>>(),
604 )?,
605 });
606 Transformed::new(plan, exprs.transformed, exprs.tnr)
607 }
608 LogicalPlan::TableScan(TableScan {
609 table_name,
610 source,
611 projection,
612 projected_schema,
613 filters,
614 fetch,
615 }) => filters.map_elements(f)?.update_data(|filters| {
616 LogicalPlan::TableScan(TableScan {
617 table_name,
618 source,
619 projection,
620 projected_schema,
621 filters,
622 fetch,
623 })
624 }),
625 LogicalPlan::Distinct(Distinct::On(DistinctOn {
626 on_expr,
627 select_expr,
628 sort_expr,
629 input,
630 schema,
631 })) => (on_expr, select_expr, sort_expr)
632 .map_elements(f)?
633 .update_data(|(on_expr, select_expr, sort_expr)| {
634 LogicalPlan::Distinct(Distinct::On(DistinctOn {
635 on_expr,
636 select_expr,
637 sort_expr,
638 input,
639 schema,
640 }))
641 }),
642 LogicalPlan::Limit(Limit { skip, fetch, input }) => {
643 (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
644 LogicalPlan::Limit(Limit { skip, fetch, input })
645 })
646 }
647 LogicalPlan::Statement(stmt) => match stmt {
648 Statement::Execute(e) => {
649 e.parameters.map_elements(f)?.update_data(|parameters| {
650 Statement::Execute(Execute { parameters, ..e })
651 })
652 }
653 _ => Transformed::no(stmt),
654 }
655 .update_data(LogicalPlan::Statement),
656 LogicalPlan::EmptyRelation(_)
658 | LogicalPlan::Unnest(_)
659 | LogicalPlan::RecursiveQuery(_)
660 | LogicalPlan::Subquery(_)
661 | LogicalPlan::SubqueryAlias(_)
662 | LogicalPlan::Analyze(_)
663 | LogicalPlan::Explain(_)
664 | LogicalPlan::Union(_)
665 | LogicalPlan::Distinct(Distinct::All(_))
666 | LogicalPlan::Dml(_)
667 | LogicalPlan::Ddl(_)
668 | LogicalPlan::Copy(_)
669 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
670 })
671 }
672
673 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
676 pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
677 &self,
678 visitor: &mut V,
679 ) -> Result<TreeNodeRecursion> {
680 visitor
681 .f_down(self)?
682 .visit_children(|| {
683 self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
684 .visit_sibling(|| {
685 self.apply_children(|c| c.visit_with_subqueries(visitor))
686 })
687 })?
688 .visit_parent(|| visitor.f_up(self))
689 }
690
691 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
695 pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
696 self,
697 rewriter: &mut R,
698 ) -> Result<Transformed<Self>> {
699 handle_transform_recursion!(
700 rewriter.f_down(self),
701 |c| c.rewrite_with_subqueries(rewriter),
702 |n| rewriter.f_up(n)
703 )
704 }
705
706 pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
710 &self,
711 mut f: F,
712 ) -> Result<TreeNodeRecursion> {
713 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
714 fn apply_with_subqueries_impl<
715 F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
716 >(
717 node: &LogicalPlan,
718 f: &mut F,
719 ) -> Result<TreeNodeRecursion> {
720 f(node)?.visit_children(|| {
721 node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
722 .visit_sibling(|| {
723 node.apply_children(|c| apply_with_subqueries_impl(c, f))
724 })
725 })
726 }
727
728 apply_with_subqueries_impl(self, &mut f)
729 }
730
731 pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
735 self,
736 f: F,
737 ) -> Result<Transformed<Self>> {
738 self.transform_up_with_subqueries(f)
739 }
740
741 pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
745 self,
746 mut f: F,
747 ) -> Result<Transformed<Self>> {
748 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
749 fn transform_down_with_subqueries_impl<
750 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
751 >(
752 node: LogicalPlan,
753 f: &mut F,
754 ) -> Result<Transformed<LogicalPlan>> {
755 f(node)?.transform_children(|n| {
756 n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
757 .transform_sibling(|n| {
758 n.map_children(|c| transform_down_with_subqueries_impl(c, f))
759 })
760 })
761 }
762
763 transform_down_with_subqueries_impl(self, &mut f)
764 }
765
766 pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
770 self,
771 mut f: F,
772 ) -> Result<Transformed<Self>> {
773 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
774 fn transform_up_with_subqueries_impl<
775 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
776 >(
777 node: LogicalPlan,
778 f: &mut F,
779 ) -> Result<Transformed<LogicalPlan>> {
780 node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
781 .transform_sibling(|n| {
782 n.map_children(|c| transform_up_with_subqueries_impl(c, f))
783 })?
784 .transform_parent(f)
785 }
786
787 transform_up_with_subqueries_impl(self, &mut f)
788 }
789
790 pub fn transform_down_up_with_subqueries<
794 FD: FnMut(Self) -> Result<Transformed<Self>>,
795 FU: FnMut(Self) -> Result<Transformed<Self>>,
796 >(
797 self,
798 mut f_down: FD,
799 mut f_up: FU,
800 ) -> Result<Transformed<Self>> {
801 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
802 fn transform_down_up_with_subqueries_impl<
803 FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
804 FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
805 >(
806 node: LogicalPlan,
807 f_down: &mut FD,
808 f_up: &mut FU,
809 ) -> Result<Transformed<LogicalPlan>> {
810 handle_transform_recursion!(
811 f_down(node),
812 |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
813 f_up
814 )
815 }
816
817 transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
818 }
819
820 pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
824 &self,
825 mut f: F,
826 ) -> Result<TreeNodeRecursion> {
827 self.apply_expressions(|expr| {
828 expr.apply(|expr| match expr {
829 Expr::Exists(Exists { subquery, .. })
830 | Expr::InSubquery(InSubquery { subquery, .. })
831 | Expr::ScalarSubquery(subquery) => {
832 f(&LogicalPlan::Subquery(subquery.clone()))
836 }
837 _ => Ok(TreeNodeRecursion::Continue),
838 })
839 })
840 }
841
842 pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
847 self,
848 mut f: F,
849 ) -> Result<Transformed<Self>> {
850 self.map_expressions(|expr| {
851 expr.transform_down(|expr| match expr {
852 Expr::Exists(Exists { subquery, negated }) => {
853 f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
854 LogicalPlan::Subquery(subquery) => {
855 Ok(Expr::Exists(Exists { subquery, negated }))
856 }
857 _ => internal_err!("Transformation should return Subquery"),
858 })
859 }
860 Expr::InSubquery(InSubquery {
861 expr,
862 subquery,
863 negated,
864 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
865 LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
866 expr,
867 subquery,
868 negated,
869 })),
870 _ => internal_err!("Transformation should return Subquery"),
871 }),
872 Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
873 .map_data(|s| match s {
874 LogicalPlan::Subquery(subquery) => {
875 Ok(Expr::ScalarSubquery(subquery))
876 }
877 _ => internal_err!("Transformation should return Subquery"),
878 }),
879 _ => Ok(Transformed::no(expr)),
880 })
881 })
882 }
883}