1use crate::{
41 dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42 Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43 Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44 Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45 UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51 Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52 TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57 fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58 &'n self,
59 f: F,
60 ) -> Result<TreeNodeRecursion> {
61 self.inputs().apply_ref_elements(f)
62 }
63
64 fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73 self,
74 f: F,
75 ) -> Result<Transformed<Self>> {
76 Ok(match self {
77 LogicalPlan::Projection(Projection {
78 expr,
79 input,
80 schema,
81 }) => input.map_elements(f)?.update_data(|input| {
82 LogicalPlan::Projection(Projection {
83 expr,
84 input,
85 schema,
86 })
87 }),
88 LogicalPlan::Filter(Filter { predicate, input }) => input
89 .map_elements(f)?
90 .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91 LogicalPlan::Repartition(Repartition {
92 input,
93 partitioning_scheme,
94 }) => input.map_elements(f)?.update_data(|input| {
95 LogicalPlan::Repartition(Repartition {
96 input,
97 partitioning_scheme,
98 })
99 }),
100 LogicalPlan::Window(Window {
101 input,
102 window_expr,
103 schema,
104 }) => input.map_elements(f)?.update_data(|input| {
105 LogicalPlan::Window(Window {
106 input,
107 window_expr,
108 schema,
109 })
110 }),
111 LogicalPlan::Aggregate(Aggregate {
112 input,
113 group_expr,
114 aggr_expr,
115 schema,
116 }) => input.map_elements(f)?.update_data(|input| {
117 LogicalPlan::Aggregate(Aggregate {
118 input,
119 group_expr,
120 aggr_expr,
121 schema,
122 })
123 }),
124 LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125 .map_elements(f)?
126 .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127 LogicalPlan::Join(Join {
128 left,
129 right,
130 on,
131 filter,
132 join_type,
133 join_constraint,
134 schema,
135 null_equals_null,
136 }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
137 LogicalPlan::Join(Join {
138 left,
139 right,
140 on,
141 filter,
142 join_type,
143 join_constraint,
144 schema,
145 null_equals_null,
146 })
147 }),
148 LogicalPlan::Limit(Limit { skip, fetch, input }) => input
149 .map_elements(f)?
150 .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
151 LogicalPlan::Subquery(Subquery {
152 subquery,
153 outer_ref_columns,
154 spans,
155 }) => subquery.map_elements(f)?.update_data(|subquery| {
156 LogicalPlan::Subquery(Subquery {
157 subquery,
158 outer_ref_columns,
159 spans,
160 })
161 }),
162 LogicalPlan::SubqueryAlias(SubqueryAlias {
163 input,
164 alias,
165 schema,
166 }) => input.map_elements(f)?.update_data(|input| {
167 LogicalPlan::SubqueryAlias(SubqueryAlias {
168 input,
169 alias,
170 schema,
171 })
172 }),
173 LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
174 .update_data(LogicalPlan::Extension),
175 LogicalPlan::Union(Union { inputs, schema }) => inputs
176 .map_elements(f)?
177 .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
178 LogicalPlan::Distinct(distinct) => match distinct {
179 Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
180 Distinct::On(DistinctOn {
181 on_expr,
182 select_expr,
183 sort_expr,
184 input,
185 schema,
186 }) => input.map_elements(f)?.update_data(|input| {
187 Distinct::On(DistinctOn {
188 on_expr,
189 select_expr,
190 sort_expr,
191 input,
192 schema,
193 })
194 }),
195 }
196 .update_data(LogicalPlan::Distinct),
197 LogicalPlan::Explain(Explain {
198 verbose,
199 explain_format: format,
200 plan,
201 stringified_plans,
202 schema,
203 logical_optimization_succeeded,
204 }) => plan.map_elements(f)?.update_data(|plan| {
205 LogicalPlan::Explain(Explain {
206 verbose,
207 explain_format: format,
208 plan,
209 stringified_plans,
210 schema,
211 logical_optimization_succeeded,
212 })
213 }),
214 LogicalPlan::Analyze(Analyze {
215 verbose,
216 input,
217 schema,
218 }) => input.map_elements(f)?.update_data(|input| {
219 LogicalPlan::Analyze(Analyze {
220 verbose,
221 input,
222 schema,
223 })
224 }),
225 LogicalPlan::Dml(DmlStatement {
226 table_name,
227 target,
228 op,
229 input,
230 output_schema,
231 }) => input.map_elements(f)?.update_data(|input| {
232 LogicalPlan::Dml(DmlStatement {
233 table_name,
234 target,
235 op,
236 input,
237 output_schema,
238 })
239 }),
240 LogicalPlan::Copy(CopyTo {
241 input,
242 output_url,
243 partition_by,
244 file_type,
245 options,
246 }) => input.map_elements(f)?.update_data(|input| {
247 LogicalPlan::Copy(CopyTo {
248 input,
249 output_url,
250 partition_by,
251 file_type,
252 options,
253 })
254 }),
255 LogicalPlan::Ddl(ddl) => {
256 match ddl {
257 DdlStatement::CreateMemoryTable(CreateMemoryTable {
258 name,
259 constraints,
260 input,
261 if_not_exists,
262 or_replace,
263 column_defaults,
264 temporary,
265 }) => input.map_elements(f)?.update_data(|input| {
266 DdlStatement::CreateMemoryTable(CreateMemoryTable {
267 name,
268 constraints,
269 input,
270 if_not_exists,
271 or_replace,
272 column_defaults,
273 temporary,
274 })
275 }),
276 DdlStatement::CreateView(CreateView {
277 name,
278 input,
279 or_replace,
280 definition,
281 temporary,
282 }) => input.map_elements(f)?.update_data(|input| {
283 DdlStatement::CreateView(CreateView {
284 name,
285 input,
286 or_replace,
287 definition,
288 temporary,
289 })
290 }),
291 DdlStatement::CreateExternalTable(_)
293 | DdlStatement::CreateCatalogSchema(_)
294 | DdlStatement::CreateCatalog(_)
295 | DdlStatement::CreateIndex(_)
296 | DdlStatement::DropTable(_)
297 | DdlStatement::DropView(_)
298 | DdlStatement::DropCatalogSchema(_)
299 | DdlStatement::CreateFunction(_)
300 | DdlStatement::DropFunction(_) => Transformed::no(ddl),
301 }
302 .update_data(LogicalPlan::Ddl)
303 }
304 LogicalPlan::Unnest(Unnest {
305 input,
306 exec_columns: input_columns,
307 list_type_columns,
308 struct_type_columns,
309 dependency_indices,
310 schema,
311 options,
312 }) => input.map_elements(f)?.update_data(|input| {
313 LogicalPlan::Unnest(Unnest {
314 input,
315 exec_columns: input_columns,
316 dependency_indices,
317 list_type_columns,
318 struct_type_columns,
319 schema,
320 options,
321 })
322 }),
323 LogicalPlan::RecursiveQuery(RecursiveQuery {
324 name,
325 static_term,
326 recursive_term,
327 is_distinct,
328 }) => (static_term, recursive_term).map_elements(f)?.update_data(
329 |(static_term, recursive_term)| {
330 LogicalPlan::RecursiveQuery(RecursiveQuery {
331 name,
332 static_term,
333 recursive_term,
334 is_distinct,
335 })
336 },
337 ),
338 LogicalPlan::Statement(stmt) => match stmt {
339 Statement::Prepare(p) => p
340 .input
341 .map_elements(f)?
342 .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
343 _ => Transformed::no(stmt),
344 }
345 .update_data(LogicalPlan::Statement),
346 LogicalPlan::TableScan { .. }
348 | LogicalPlan::EmptyRelation { .. }
349 | LogicalPlan::Values { .. }
350 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
351 })
352 }
353}
354
355fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
361 extension: Extension,
362 f: F,
363) -> Result<Transformed<Extension>> {
364 let Extension { node } = extension;
365
366 node.inputs()
367 .into_iter()
368 .cloned()
369 .map_until_stop_and_collect(f)?
370 .map_data(|new_inputs| {
371 let exprs = node.expressions();
372 Ok(Extension {
373 node: node.with_exprs_and_inputs(exprs, new_inputs)?,
374 })
375 })
376}
377
378macro_rules! handle_transform_recursion {
381 ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
382 $F_DOWN?
383 .transform_children(|n| {
384 n.map_subqueries($F_CHILD)?
385 .transform_sibling(|n| n.map_children($F_CHILD))
386 })?
387 .transform_parent($F_UP)
388 }};
389}
390
391impl LogicalPlan {
392 pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
399 &self,
400 mut f: F,
401 ) -> Result<TreeNodeRecursion> {
402 match self {
403 LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
404 LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
405 LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
406 LogicalPlan::Repartition(Repartition {
407 partitioning_scheme,
408 ..
409 }) => match partitioning_scheme {
410 Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
411 expr.apply_elements(f)
412 }
413 Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
414 },
415 LogicalPlan::Window(Window { window_expr, .. }) => {
416 window_expr.apply_elements(f)
417 }
418 LogicalPlan::Aggregate(Aggregate {
419 group_expr,
420 aggr_expr,
421 ..
422 }) => (group_expr, aggr_expr).apply_ref_elements(f),
423 LogicalPlan::Join(Join { on, filter, .. }) => {
427 (on, filter).apply_ref_elements(f)
428 }
429 LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
430 LogicalPlan::Extension(extension) => {
431 extension.node.expressions().apply_elements(f)
434 }
435 LogicalPlan::TableScan(TableScan { filters, .. }) => {
436 filters.apply_elements(f)
437 }
438 LogicalPlan::Unnest(unnest) => {
439 let columns = unnest.exec_columns.clone();
440
441 let exprs = columns
442 .iter()
443 .map(|c| Expr::Column(c.clone()))
444 .collect::<Vec<_>>();
445 exprs.apply_elements(f)
446 }
447 LogicalPlan::Distinct(Distinct::On(DistinctOn {
448 on_expr,
449 select_expr,
450 sort_expr,
451 ..
452 })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
453 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
454 (skip, fetch).apply_ref_elements(f)
455 }
456 LogicalPlan::Statement(stmt) => match stmt {
457 Statement::Execute(Execute { parameters, .. }) => {
458 parameters.apply_elements(f)
459 }
460 _ => Ok(TreeNodeRecursion::Continue),
461 },
462 LogicalPlan::EmptyRelation(_)
464 | LogicalPlan::RecursiveQuery(_)
465 | LogicalPlan::Subquery(_)
466 | LogicalPlan::SubqueryAlias(_)
467 | LogicalPlan::Analyze(_)
468 | LogicalPlan::Explain(_)
469 | LogicalPlan::Union(_)
470 | LogicalPlan::Distinct(Distinct::All(_))
471 | LogicalPlan::Dml(_)
472 | LogicalPlan::Ddl(_)
473 | LogicalPlan::Copy(_)
474 | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
475 }
476 }
477
478 pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
486 self,
487 mut f: F,
488 ) -> Result<Transformed<Self>> {
489 Ok(match self {
490 LogicalPlan::Projection(Projection {
491 expr,
492 input,
493 schema,
494 }) => expr.map_elements(f)?.update_data(|expr| {
495 LogicalPlan::Projection(Projection {
496 expr,
497 input,
498 schema,
499 })
500 }),
501 LogicalPlan::Values(Values { schema, values }) => values
502 .map_elements(f)?
503 .update_data(|values| LogicalPlan::Values(Values { schema, values })),
504 LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
505 .update_data(|predicate| {
506 LogicalPlan::Filter(Filter { predicate, input })
507 }),
508 LogicalPlan::Repartition(Repartition {
509 input,
510 partitioning_scheme,
511 }) => match partitioning_scheme {
512 Partitioning::Hash(expr, usize) => expr
513 .map_elements(f)?
514 .update_data(|expr| Partitioning::Hash(expr, usize)),
515 Partitioning::DistributeBy(expr) => expr
516 .map_elements(f)?
517 .update_data(Partitioning::DistributeBy),
518 Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
519 }
520 .update_data(|partitioning_scheme| {
521 LogicalPlan::Repartition(Repartition {
522 input,
523 partitioning_scheme,
524 })
525 }),
526 LogicalPlan::Window(Window {
527 input,
528 window_expr,
529 schema,
530 }) => window_expr.map_elements(f)?.update_data(|window_expr| {
531 LogicalPlan::Window(Window {
532 input,
533 window_expr,
534 schema,
535 })
536 }),
537 LogicalPlan::Aggregate(Aggregate {
538 input,
539 group_expr,
540 aggr_expr,
541 schema,
542 }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
543 |(group_expr, aggr_expr)| {
544 LogicalPlan::Aggregate(Aggregate {
545 input,
546 group_expr,
547 aggr_expr,
548 schema,
549 })
550 },
551 ),
552
553 LogicalPlan::Join(Join {
557 left,
558 right,
559 on,
560 filter,
561 join_type,
562 join_constraint,
563 schema,
564 null_equals_null,
565 }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
566 LogicalPlan::Join(Join {
567 left,
568 right,
569 on,
570 filter,
571 join_type,
572 join_constraint,
573 schema,
574 null_equals_null,
575 })
576 }),
577 LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
578 .map_elements(f)?
579 .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
580 LogicalPlan::Extension(Extension { node }) => {
581 let exprs = node.expressions().map_elements(f)?;
584 let plan = LogicalPlan::Extension(Extension {
585 node: UserDefinedLogicalNode::with_exprs_and_inputs(
586 node.as_ref(),
587 exprs.data,
588 node.inputs().into_iter().cloned().collect::<Vec<_>>(),
589 )?,
590 });
591 Transformed::new(plan, exprs.transformed, exprs.tnr)
592 }
593 LogicalPlan::TableScan(TableScan {
594 table_name,
595 source,
596 projection,
597 projected_schema,
598 filters,
599 fetch,
600 }) => filters.map_elements(f)?.update_data(|filters| {
601 LogicalPlan::TableScan(TableScan {
602 table_name,
603 source,
604 projection,
605 projected_schema,
606 filters,
607 fetch,
608 })
609 }),
610 LogicalPlan::Distinct(Distinct::On(DistinctOn {
611 on_expr,
612 select_expr,
613 sort_expr,
614 input,
615 schema,
616 })) => (on_expr, select_expr, sort_expr)
617 .map_elements(f)?
618 .update_data(|(on_expr, select_expr, sort_expr)| {
619 LogicalPlan::Distinct(Distinct::On(DistinctOn {
620 on_expr,
621 select_expr,
622 sort_expr,
623 input,
624 schema,
625 }))
626 }),
627 LogicalPlan::Limit(Limit { skip, fetch, input }) => {
628 (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
629 LogicalPlan::Limit(Limit { skip, fetch, input })
630 })
631 }
632 LogicalPlan::Statement(stmt) => match stmt {
633 Statement::Execute(e) => {
634 e.parameters.map_elements(f)?.update_data(|parameters| {
635 Statement::Execute(Execute { parameters, ..e })
636 })
637 }
638 _ => Transformed::no(stmt),
639 }
640 .update_data(LogicalPlan::Statement),
641 LogicalPlan::EmptyRelation(_)
643 | LogicalPlan::Unnest(_)
644 | LogicalPlan::RecursiveQuery(_)
645 | LogicalPlan::Subquery(_)
646 | LogicalPlan::SubqueryAlias(_)
647 | LogicalPlan::Analyze(_)
648 | LogicalPlan::Explain(_)
649 | LogicalPlan::Union(_)
650 | LogicalPlan::Distinct(Distinct::All(_))
651 | LogicalPlan::Dml(_)
652 | LogicalPlan::Ddl(_)
653 | LogicalPlan::Copy(_)
654 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
655 })
656 }
657
658 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
661 pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
662 &self,
663 visitor: &mut V,
664 ) -> Result<TreeNodeRecursion> {
665 visitor
666 .f_down(self)?
667 .visit_children(|| {
668 self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
669 .visit_sibling(|| {
670 self.apply_children(|c| c.visit_with_subqueries(visitor))
671 })
672 })?
673 .visit_parent(|| visitor.f_up(self))
674 }
675
676 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
680 pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
681 self,
682 rewriter: &mut R,
683 ) -> Result<Transformed<Self>> {
684 handle_transform_recursion!(
685 rewriter.f_down(self),
686 |c| c.rewrite_with_subqueries(rewriter),
687 |n| rewriter.f_up(n)
688 )
689 }
690
691 pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
695 &self,
696 mut f: F,
697 ) -> Result<TreeNodeRecursion> {
698 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
699 fn apply_with_subqueries_impl<
700 F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
701 >(
702 node: &LogicalPlan,
703 f: &mut F,
704 ) -> Result<TreeNodeRecursion> {
705 f(node)?.visit_children(|| {
706 node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
707 .visit_sibling(|| {
708 node.apply_children(|c| apply_with_subqueries_impl(c, f))
709 })
710 })
711 }
712
713 apply_with_subqueries_impl(self, &mut f)
714 }
715
716 pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
720 self,
721 f: F,
722 ) -> Result<Transformed<Self>> {
723 self.transform_up_with_subqueries(f)
724 }
725
726 pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
730 self,
731 mut f: F,
732 ) -> Result<Transformed<Self>> {
733 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
734 fn transform_down_with_subqueries_impl<
735 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
736 >(
737 node: LogicalPlan,
738 f: &mut F,
739 ) -> Result<Transformed<LogicalPlan>> {
740 f(node)?.transform_children(|n| {
741 n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
742 .transform_sibling(|n| {
743 n.map_children(|c| transform_down_with_subqueries_impl(c, f))
744 })
745 })
746 }
747
748 transform_down_with_subqueries_impl(self, &mut f)
749 }
750
751 pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
755 self,
756 mut f: F,
757 ) -> Result<Transformed<Self>> {
758 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
759 fn transform_up_with_subqueries_impl<
760 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
761 >(
762 node: LogicalPlan,
763 f: &mut F,
764 ) -> Result<Transformed<LogicalPlan>> {
765 node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
766 .transform_sibling(|n| {
767 n.map_children(|c| transform_up_with_subqueries_impl(c, f))
768 })?
769 .transform_parent(f)
770 }
771
772 transform_up_with_subqueries_impl(self, &mut f)
773 }
774
775 pub fn transform_down_up_with_subqueries<
779 FD: FnMut(Self) -> Result<Transformed<Self>>,
780 FU: FnMut(Self) -> Result<Transformed<Self>>,
781 >(
782 self,
783 mut f_down: FD,
784 mut f_up: FU,
785 ) -> Result<Transformed<Self>> {
786 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
787 fn transform_down_up_with_subqueries_impl<
788 FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
789 FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
790 >(
791 node: LogicalPlan,
792 f_down: &mut FD,
793 f_up: &mut FU,
794 ) -> Result<Transformed<LogicalPlan>> {
795 handle_transform_recursion!(
796 f_down(node),
797 |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
798 f_up
799 )
800 }
801
802 transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
803 }
804
805 pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
809 &self,
810 mut f: F,
811 ) -> Result<TreeNodeRecursion> {
812 self.apply_expressions(|expr| {
813 expr.apply(|expr| match expr {
814 Expr::Exists(Exists { subquery, .. })
815 | Expr::InSubquery(InSubquery { subquery, .. })
816 | Expr::ScalarSubquery(subquery) => {
817 f(&LogicalPlan::Subquery(subquery.clone()))
821 }
822 _ => Ok(TreeNodeRecursion::Continue),
823 })
824 })
825 }
826
827 pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
832 self,
833 mut f: F,
834 ) -> Result<Transformed<Self>> {
835 self.map_expressions(|expr| {
836 expr.transform_down(|expr| match expr {
837 Expr::Exists(Exists { subquery, negated }) => {
838 f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
839 LogicalPlan::Subquery(subquery) => {
840 Ok(Expr::Exists(Exists { subquery, negated }))
841 }
842 _ => internal_err!("Transformation should return Subquery"),
843 })
844 }
845 Expr::InSubquery(InSubquery {
846 expr,
847 subquery,
848 negated,
849 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
850 LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
851 expr,
852 subquery,
853 negated,
854 })),
855 _ => internal_err!("Transformation should return Subquery"),
856 }),
857 Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
858 .map_data(|s| match s {
859 LogicalPlan::Subquery(subquery) => {
860 Ok(Expr::ScalarSubquery(subquery))
861 }
862 _ => internal_err!("Transformation should return Subquery"),
863 }),
864 _ => Ok(Transformed::no(expr)),
865 })
866 })
867 }
868}