1use crate::{
41 dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42 Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43 Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44 Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45 UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51 Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52 TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57 fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58 &'n self,
59 f: F,
60 ) -> Result<TreeNodeRecursion> {
61 self.inputs().apply_ref_elements(f)
62 }
63
64 fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73 self,
74 f: F,
75 ) -> Result<Transformed<Self>> {
76 Ok(match self {
77 LogicalPlan::Projection(Projection {
78 expr,
79 input,
80 schema,
81 }) => input.map_elements(f)?.update_data(|input| {
82 LogicalPlan::Projection(Projection {
83 expr,
84 input,
85 schema,
86 })
87 }),
88 LogicalPlan::Filter(Filter { predicate, input }) => input
89 .map_elements(f)?
90 .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91 LogicalPlan::Repartition(Repartition {
92 input,
93 partitioning_scheme,
94 }) => input.map_elements(f)?.update_data(|input| {
95 LogicalPlan::Repartition(Repartition {
96 input,
97 partitioning_scheme,
98 })
99 }),
100 LogicalPlan::Window(Window {
101 input,
102 window_expr,
103 schema,
104 }) => input.map_elements(f)?.update_data(|input| {
105 LogicalPlan::Window(Window {
106 input,
107 window_expr,
108 schema,
109 })
110 }),
111 LogicalPlan::Aggregate(Aggregate {
112 input,
113 group_expr,
114 aggr_expr,
115 schema,
116 }) => input.map_elements(f)?.update_data(|input| {
117 LogicalPlan::Aggregate(Aggregate {
118 input,
119 group_expr,
120 aggr_expr,
121 schema,
122 })
123 }),
124 LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125 .map_elements(f)?
126 .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127 LogicalPlan::Join(Join {
128 left,
129 right,
130 on,
131 filter,
132 join_type,
133 join_constraint,
134 schema,
135 null_equality,
136 }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
137 LogicalPlan::Join(Join {
138 left,
139 right,
140 on,
141 filter,
142 join_type,
143 join_constraint,
144 schema,
145 null_equality,
146 })
147 }),
148 LogicalPlan::Limit(Limit { skip, fetch, input }) => input
149 .map_elements(f)?
150 .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
151 LogicalPlan::Subquery(Subquery {
152 subquery,
153 outer_ref_columns,
154 spans,
155 }) => subquery.map_elements(f)?.update_data(|subquery| {
156 LogicalPlan::Subquery(Subquery {
157 subquery,
158 outer_ref_columns,
159 spans,
160 })
161 }),
162 LogicalPlan::SubqueryAlias(SubqueryAlias {
163 input,
164 alias,
165 schema,
166 }) => input.map_elements(f)?.update_data(|input| {
167 LogicalPlan::SubqueryAlias(SubqueryAlias {
168 input,
169 alias,
170 schema,
171 })
172 }),
173 LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
174 .update_data(LogicalPlan::Extension),
175 LogicalPlan::Union(Union { inputs, schema }) => inputs
176 .map_elements(f)?
177 .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
178 LogicalPlan::Distinct(distinct) => match distinct {
179 Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
180 Distinct::On(DistinctOn {
181 on_expr,
182 select_expr,
183 sort_expr,
184 input,
185 schema,
186 }) => input.map_elements(f)?.update_data(|input| {
187 Distinct::On(DistinctOn {
188 on_expr,
189 select_expr,
190 sort_expr,
191 input,
192 schema,
193 })
194 }),
195 }
196 .update_data(LogicalPlan::Distinct),
197 LogicalPlan::Explain(Explain {
198 verbose,
199 explain_format: format,
200 plan,
201 stringified_plans,
202 schema,
203 logical_optimization_succeeded,
204 }) => plan.map_elements(f)?.update_data(|plan| {
205 LogicalPlan::Explain(Explain {
206 verbose,
207 explain_format: format,
208 plan,
209 stringified_plans,
210 schema,
211 logical_optimization_succeeded,
212 })
213 }),
214 LogicalPlan::Analyze(Analyze {
215 verbose,
216 input,
217 schema,
218 }) => input.map_elements(f)?.update_data(|input| {
219 LogicalPlan::Analyze(Analyze {
220 verbose,
221 input,
222 schema,
223 })
224 }),
225 LogicalPlan::Dml(DmlStatement {
226 table_name,
227 target,
228 op,
229 input,
230 output_schema,
231 }) => input.map_elements(f)?.update_data(|input| {
232 LogicalPlan::Dml(DmlStatement {
233 table_name,
234 target,
235 op,
236 input,
237 output_schema,
238 })
239 }),
240 LogicalPlan::Copy(CopyTo {
241 input,
242 output_url,
243 partition_by,
244 file_type,
245 options,
246 output_schema,
247 }) => input.map_elements(f)?.update_data(|input| {
248 LogicalPlan::Copy(CopyTo {
249 input,
250 output_url,
251 partition_by,
252 file_type,
253 options,
254 output_schema,
255 })
256 }),
257 LogicalPlan::Ddl(ddl) => {
258 match ddl {
259 DdlStatement::CreateMemoryTable(CreateMemoryTable {
260 name,
261 constraints,
262 input,
263 if_not_exists,
264 or_replace,
265 column_defaults,
266 temporary,
267 }) => input.map_elements(f)?.update_data(|input| {
268 DdlStatement::CreateMemoryTable(CreateMemoryTable {
269 name,
270 constraints,
271 input,
272 if_not_exists,
273 or_replace,
274 column_defaults,
275 temporary,
276 })
277 }),
278 DdlStatement::CreateView(CreateView {
279 name,
280 input,
281 or_replace,
282 definition,
283 temporary,
284 }) => input.map_elements(f)?.update_data(|input| {
285 DdlStatement::CreateView(CreateView {
286 name,
287 input,
288 or_replace,
289 definition,
290 temporary,
291 })
292 }),
293 DdlStatement::CreateExternalTable(_)
295 | DdlStatement::CreateCatalogSchema(_)
296 | DdlStatement::CreateCatalog(_)
297 | DdlStatement::CreateIndex(_)
298 | DdlStatement::DropTable(_)
299 | DdlStatement::DropView(_)
300 | DdlStatement::DropCatalogSchema(_)
301 | DdlStatement::CreateFunction(_)
302 | DdlStatement::DropFunction(_) => Transformed::no(ddl),
303 }
304 .update_data(LogicalPlan::Ddl)
305 }
306 LogicalPlan::Unnest(Unnest {
307 input,
308 exec_columns: input_columns,
309 list_type_columns,
310 struct_type_columns,
311 dependency_indices,
312 schema,
313 options,
314 }) => input.map_elements(f)?.update_data(|input| {
315 LogicalPlan::Unnest(Unnest {
316 input,
317 exec_columns: input_columns,
318 list_type_columns,
319 struct_type_columns,
320 dependency_indices,
321 schema,
322 options,
323 })
324 }),
325 LogicalPlan::RecursiveQuery(RecursiveQuery {
326 name,
327 static_term,
328 recursive_term,
329 is_distinct,
330 }) => (static_term, recursive_term).map_elements(f)?.update_data(
331 |(static_term, recursive_term)| {
332 LogicalPlan::RecursiveQuery(RecursiveQuery {
333 name,
334 static_term,
335 recursive_term,
336 is_distinct,
337 })
338 },
339 ),
340 LogicalPlan::Statement(stmt) => match stmt {
341 Statement::Prepare(p) => p
342 .input
343 .map_elements(f)?
344 .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
345 _ => Transformed::no(stmt),
346 }
347 .update_data(LogicalPlan::Statement),
348 LogicalPlan::TableScan { .. }
350 | LogicalPlan::EmptyRelation { .. }
351 | LogicalPlan::Values { .. }
352 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
353 })
354 }
355}
356
357fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
363 extension: Extension,
364 f: F,
365) -> Result<Transformed<Extension>> {
366 let Extension { node } = extension;
367
368 node.inputs()
369 .into_iter()
370 .cloned()
371 .map_until_stop_and_collect(f)?
372 .map_data(|new_inputs| {
373 let exprs = node.expressions();
374 Ok(Extension {
375 node: node.with_exprs_and_inputs(exprs, new_inputs)?,
376 })
377 })
378}
379
380macro_rules! handle_transform_recursion {
383 ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
384 $F_DOWN?
385 .transform_children(|n| {
386 n.map_subqueries($F_CHILD)?
387 .transform_sibling(|n| n.map_children($F_CHILD))
388 })?
389 .transform_parent($F_UP)
390 }};
391}
392
393impl LogicalPlan {
394 pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
401 &self,
402 mut f: F,
403 ) -> Result<TreeNodeRecursion> {
404 match self {
405 LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
406 LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
407 LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
408 LogicalPlan::Repartition(Repartition {
409 partitioning_scheme,
410 ..
411 }) => match partitioning_scheme {
412 Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
413 expr.apply_elements(f)
414 }
415 Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
416 },
417 LogicalPlan::Window(Window { window_expr, .. }) => {
418 window_expr.apply_elements(f)
419 }
420 LogicalPlan::Aggregate(Aggregate {
421 group_expr,
422 aggr_expr,
423 ..
424 }) => (group_expr, aggr_expr).apply_ref_elements(f),
425 LogicalPlan::Join(Join { on, filter, .. }) => {
429 (on, filter).apply_ref_elements(f)
430 }
431 LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
432 LogicalPlan::Extension(extension) => {
433 extension.node.expressions().apply_elements(f)
436 }
437 LogicalPlan::TableScan(TableScan { filters, .. }) => {
438 filters.apply_elements(f)
439 }
440 LogicalPlan::Unnest(unnest) => {
441 let exprs = unnest
442 .exec_columns
443 .iter()
444 .cloned()
445 .map(Expr::Column)
446 .collect::<Vec<_>>();
447 exprs.apply_elements(f)
448 }
449 LogicalPlan::Distinct(Distinct::On(DistinctOn {
450 on_expr,
451 select_expr,
452 sort_expr,
453 ..
454 })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
455 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
456 (skip, fetch).apply_ref_elements(f)
457 }
458 LogicalPlan::Statement(stmt) => match stmt {
459 Statement::Execute(Execute { parameters, .. }) => {
460 parameters.apply_elements(f)
461 }
462 _ => Ok(TreeNodeRecursion::Continue),
463 },
464 LogicalPlan::EmptyRelation(_)
466 | LogicalPlan::RecursiveQuery(_)
467 | LogicalPlan::Subquery(_)
468 | LogicalPlan::SubqueryAlias(_)
469 | LogicalPlan::Analyze(_)
470 | LogicalPlan::Explain(_)
471 | LogicalPlan::Union(_)
472 | LogicalPlan::Distinct(Distinct::All(_))
473 | LogicalPlan::Dml(_)
474 | LogicalPlan::Ddl(_)
475 | LogicalPlan::Copy(_)
476 | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
477 }
478 }
479
480 pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
488 self,
489 mut f: F,
490 ) -> Result<Transformed<Self>> {
491 Ok(match self {
492 LogicalPlan::Projection(Projection {
493 expr,
494 input,
495 schema,
496 }) => expr.map_elements(f)?.update_data(|expr| {
497 LogicalPlan::Projection(Projection {
498 expr,
499 input,
500 schema,
501 })
502 }),
503 LogicalPlan::Values(Values { schema, values }) => values
504 .map_elements(f)?
505 .update_data(|values| LogicalPlan::Values(Values { schema, values })),
506 LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
507 .update_data(|predicate| {
508 LogicalPlan::Filter(Filter { predicate, input })
509 }),
510 LogicalPlan::Repartition(Repartition {
511 input,
512 partitioning_scheme,
513 }) => match partitioning_scheme {
514 Partitioning::Hash(expr, usize) => expr
515 .map_elements(f)?
516 .update_data(|expr| Partitioning::Hash(expr, usize)),
517 Partitioning::DistributeBy(expr) => expr
518 .map_elements(f)?
519 .update_data(Partitioning::DistributeBy),
520 Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
521 }
522 .update_data(|partitioning_scheme| {
523 LogicalPlan::Repartition(Repartition {
524 input,
525 partitioning_scheme,
526 })
527 }),
528 LogicalPlan::Window(Window {
529 input,
530 window_expr,
531 schema,
532 }) => window_expr.map_elements(f)?.update_data(|window_expr| {
533 LogicalPlan::Window(Window {
534 input,
535 window_expr,
536 schema,
537 })
538 }),
539 LogicalPlan::Aggregate(Aggregate {
540 input,
541 group_expr,
542 aggr_expr,
543 schema,
544 }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
545 |(group_expr, aggr_expr)| {
546 LogicalPlan::Aggregate(Aggregate {
547 input,
548 group_expr,
549 aggr_expr,
550 schema,
551 })
552 },
553 ),
554
555 LogicalPlan::Join(Join {
559 left,
560 right,
561 on,
562 filter,
563 join_type,
564 join_constraint,
565 schema,
566 null_equality,
567 }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
568 LogicalPlan::Join(Join {
569 left,
570 right,
571 on,
572 filter,
573 join_type,
574 join_constraint,
575 schema,
576 null_equality,
577 })
578 }),
579 LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
580 .map_elements(f)?
581 .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
582 LogicalPlan::Extension(Extension { node }) => {
583 let exprs = node.expressions().map_elements(f)?;
586 let plan = LogicalPlan::Extension(Extension {
587 node: UserDefinedLogicalNode::with_exprs_and_inputs(
588 node.as_ref(),
589 exprs.data,
590 node.inputs().into_iter().cloned().collect::<Vec<_>>(),
591 )?,
592 });
593 Transformed::new(plan, exprs.transformed, exprs.tnr)
594 }
595 LogicalPlan::TableScan(TableScan {
596 table_name,
597 source,
598 projection,
599 projected_schema,
600 filters,
601 fetch,
602 }) => filters.map_elements(f)?.update_data(|filters| {
603 LogicalPlan::TableScan(TableScan {
604 table_name,
605 source,
606 projection,
607 projected_schema,
608 filters,
609 fetch,
610 })
611 }),
612 LogicalPlan::Distinct(Distinct::On(DistinctOn {
613 on_expr,
614 select_expr,
615 sort_expr,
616 input,
617 schema,
618 })) => (on_expr, select_expr, sort_expr)
619 .map_elements(f)?
620 .update_data(|(on_expr, select_expr, sort_expr)| {
621 LogicalPlan::Distinct(Distinct::On(DistinctOn {
622 on_expr,
623 select_expr,
624 sort_expr,
625 input,
626 schema,
627 }))
628 }),
629 LogicalPlan::Limit(Limit { skip, fetch, input }) => {
630 (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
631 LogicalPlan::Limit(Limit { skip, fetch, input })
632 })
633 }
634 LogicalPlan::Statement(stmt) => match stmt {
635 Statement::Execute(e) => {
636 e.parameters.map_elements(f)?.update_data(|parameters| {
637 Statement::Execute(Execute { parameters, ..e })
638 })
639 }
640 _ => Transformed::no(stmt),
641 }
642 .update_data(LogicalPlan::Statement),
643 LogicalPlan::EmptyRelation(_)
645 | LogicalPlan::Unnest(_)
646 | LogicalPlan::RecursiveQuery(_)
647 | LogicalPlan::Subquery(_)
648 | LogicalPlan::SubqueryAlias(_)
649 | LogicalPlan::Analyze(_)
650 | LogicalPlan::Explain(_)
651 | LogicalPlan::Union(_)
652 | LogicalPlan::Distinct(Distinct::All(_))
653 | LogicalPlan::Dml(_)
654 | LogicalPlan::Ddl(_)
655 | LogicalPlan::Copy(_)
656 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
657 })
658 }
659
660 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
663 pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
664 &self,
665 visitor: &mut V,
666 ) -> Result<TreeNodeRecursion> {
667 visitor
668 .f_down(self)?
669 .visit_children(|| {
670 self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
671 .visit_sibling(|| {
672 self.apply_children(|c| c.visit_with_subqueries(visitor))
673 })
674 })?
675 .visit_parent(|| visitor.f_up(self))
676 }
677
678 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
682 pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
683 self,
684 rewriter: &mut R,
685 ) -> Result<Transformed<Self>> {
686 handle_transform_recursion!(
687 rewriter.f_down(self),
688 |c| c.rewrite_with_subqueries(rewriter),
689 |n| rewriter.f_up(n)
690 )
691 }
692
693 pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
697 &self,
698 mut f: F,
699 ) -> Result<TreeNodeRecursion> {
700 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
701 fn apply_with_subqueries_impl<
702 F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
703 >(
704 node: &LogicalPlan,
705 f: &mut F,
706 ) -> Result<TreeNodeRecursion> {
707 f(node)?.visit_children(|| {
708 node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
709 .visit_sibling(|| {
710 node.apply_children(|c| apply_with_subqueries_impl(c, f))
711 })
712 })
713 }
714
715 apply_with_subqueries_impl(self, &mut f)
716 }
717
718 pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
722 self,
723 f: F,
724 ) -> Result<Transformed<Self>> {
725 self.transform_up_with_subqueries(f)
726 }
727
728 pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
732 self,
733 mut f: F,
734 ) -> Result<Transformed<Self>> {
735 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
736 fn transform_down_with_subqueries_impl<
737 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
738 >(
739 node: LogicalPlan,
740 f: &mut F,
741 ) -> Result<Transformed<LogicalPlan>> {
742 f(node)?.transform_children(|n| {
743 n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
744 .transform_sibling(|n| {
745 n.map_children(|c| transform_down_with_subqueries_impl(c, f))
746 })
747 })
748 }
749
750 transform_down_with_subqueries_impl(self, &mut f)
751 }
752
753 pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
757 self,
758 mut f: F,
759 ) -> Result<Transformed<Self>> {
760 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
761 fn transform_up_with_subqueries_impl<
762 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
763 >(
764 node: LogicalPlan,
765 f: &mut F,
766 ) -> Result<Transformed<LogicalPlan>> {
767 node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
768 .transform_sibling(|n| {
769 n.map_children(|c| transform_up_with_subqueries_impl(c, f))
770 })?
771 .transform_parent(f)
772 }
773
774 transform_up_with_subqueries_impl(self, &mut f)
775 }
776
777 pub fn transform_down_up_with_subqueries<
781 FD: FnMut(Self) -> Result<Transformed<Self>>,
782 FU: FnMut(Self) -> Result<Transformed<Self>>,
783 >(
784 self,
785 mut f_down: FD,
786 mut f_up: FU,
787 ) -> Result<Transformed<Self>> {
788 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
789 fn transform_down_up_with_subqueries_impl<
790 FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
791 FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
792 >(
793 node: LogicalPlan,
794 f_down: &mut FD,
795 f_up: &mut FU,
796 ) -> Result<Transformed<LogicalPlan>> {
797 handle_transform_recursion!(
798 f_down(node),
799 |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
800 f_up
801 )
802 }
803
804 transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
805 }
806
807 pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
811 &self,
812 mut f: F,
813 ) -> Result<TreeNodeRecursion> {
814 self.apply_expressions(|expr| {
815 expr.apply(|expr| match expr {
816 Expr::Exists(Exists { subquery, .. })
817 | Expr::InSubquery(InSubquery { subquery, .. })
818 | Expr::ScalarSubquery(subquery) => {
819 f(&LogicalPlan::Subquery(subquery.clone()))
823 }
824 _ => Ok(TreeNodeRecursion::Continue),
825 })
826 })
827 }
828
829 pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
834 self,
835 mut f: F,
836 ) -> Result<Transformed<Self>> {
837 self.map_expressions(|expr| {
838 expr.transform_down(|expr| match expr {
839 Expr::Exists(Exists { subquery, negated }) => {
840 f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
841 LogicalPlan::Subquery(subquery) => {
842 Ok(Expr::Exists(Exists { subquery, negated }))
843 }
844 _ => internal_err!("Transformation should return Subquery"),
845 })
846 }
847 Expr::InSubquery(InSubquery {
848 expr,
849 subquery,
850 negated,
851 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
852 LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
853 expr,
854 subquery,
855 negated,
856 })),
857 _ => internal_err!("Transformation should return Subquery"),
858 }),
859 Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
860 .map_data(|s| match s {
861 LogicalPlan::Subquery(subquery) => {
862 Ok(Expr::ScalarSubquery(subquery))
863 }
864 _ => internal_err!("Transformation should return Subquery"),
865 }),
866 _ => Ok(Transformed::no(expr)),
867 })
868 })
869 }
870}