datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42    Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43    Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44    Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45    UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter {
89                predicate,
90                input,
91                having,
92            }) => input.map_elements(f)?.update_data(|input| {
93                LogicalPlan::Filter(Filter {
94                    predicate,
95                    input,
96                    having,
97                })
98            }),
99            LogicalPlan::Repartition(Repartition {
100                input,
101                partitioning_scheme,
102            }) => input.map_elements(f)?.update_data(|input| {
103                LogicalPlan::Repartition(Repartition {
104                    input,
105                    partitioning_scheme,
106                })
107            }),
108            LogicalPlan::Window(Window {
109                input,
110                window_expr,
111                schema,
112            }) => input.map_elements(f)?.update_data(|input| {
113                LogicalPlan::Window(Window {
114                    input,
115                    window_expr,
116                    schema,
117                })
118            }),
119            LogicalPlan::Aggregate(Aggregate {
120                input,
121                group_expr,
122                aggr_expr,
123                schema,
124            }) => input.map_elements(f)?.update_data(|input| {
125                LogicalPlan::Aggregate(Aggregate {
126                    input,
127                    group_expr,
128                    aggr_expr,
129                    schema,
130                })
131            }),
132            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
133                .map_elements(f)?
134                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
135            LogicalPlan::Join(Join {
136                left,
137                right,
138                on,
139                filter,
140                join_type,
141                join_constraint,
142                schema,
143                null_equals_null,
144            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
145                LogicalPlan::Join(Join {
146                    left,
147                    right,
148                    on,
149                    filter,
150                    join_type,
151                    join_constraint,
152                    schema,
153                    null_equals_null,
154                })
155            }),
156            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
157                .map_elements(f)?
158                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
159            LogicalPlan::Subquery(Subquery {
160                subquery,
161                outer_ref_columns,
162                spans,
163            }) => subquery.map_elements(f)?.update_data(|subquery| {
164                LogicalPlan::Subquery(Subquery {
165                    subquery,
166                    outer_ref_columns,
167                    spans,
168                })
169            }),
170            LogicalPlan::SubqueryAlias(SubqueryAlias {
171                input,
172                alias,
173                schema,
174            }) => input.map_elements(f)?.update_data(|input| {
175                LogicalPlan::SubqueryAlias(SubqueryAlias {
176                    input,
177                    alias,
178                    schema,
179                })
180            }),
181            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
182                .update_data(LogicalPlan::Extension),
183            LogicalPlan::Union(Union { inputs, schema }) => inputs
184                .map_elements(f)?
185                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
186            LogicalPlan::Distinct(distinct) => match distinct {
187                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
188                Distinct::On(DistinctOn {
189                    on_expr,
190                    select_expr,
191                    sort_expr,
192                    input,
193                    schema,
194                }) => input.map_elements(f)?.update_data(|input| {
195                    Distinct::On(DistinctOn {
196                        on_expr,
197                        select_expr,
198                        sort_expr,
199                        input,
200                        schema,
201                    })
202                }),
203            }
204            .update_data(LogicalPlan::Distinct),
205            LogicalPlan::Explain(Explain {
206                verbose,
207                explain_format: format,
208                plan,
209                stringified_plans,
210                schema,
211                logical_optimization_succeeded,
212            }) => plan.map_elements(f)?.update_data(|plan| {
213                LogicalPlan::Explain(Explain {
214                    verbose,
215                    explain_format: format,
216                    plan,
217                    stringified_plans,
218                    schema,
219                    logical_optimization_succeeded,
220                })
221            }),
222            LogicalPlan::Analyze(Analyze {
223                verbose,
224                input,
225                schema,
226            }) => input.map_elements(f)?.update_data(|input| {
227                LogicalPlan::Analyze(Analyze {
228                    verbose,
229                    input,
230                    schema,
231                })
232            }),
233            LogicalPlan::Dml(DmlStatement {
234                table_name,
235                target,
236                op,
237                input,
238                output_schema,
239            }) => input.map_elements(f)?.update_data(|input| {
240                LogicalPlan::Dml(DmlStatement {
241                    table_name,
242                    target,
243                    op,
244                    input,
245                    output_schema,
246                })
247            }),
248            LogicalPlan::Copy(CopyTo {
249                input,
250                output_url,
251                partition_by,
252                file_type,
253                options,
254            }) => input.map_elements(f)?.update_data(|input| {
255                LogicalPlan::Copy(CopyTo {
256                    input,
257                    output_url,
258                    partition_by,
259                    file_type,
260                    options,
261                })
262            }),
263            LogicalPlan::Ddl(ddl) => {
264                match ddl {
265                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
266                        name,
267                        constraints,
268                        input,
269                        if_not_exists,
270                        or_replace,
271                        column_defaults,
272                        temporary,
273                    }) => input.map_elements(f)?.update_data(|input| {
274                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
275                            name,
276                            constraints,
277                            input,
278                            if_not_exists,
279                            or_replace,
280                            column_defaults,
281                            temporary,
282                        })
283                    }),
284                    DdlStatement::CreateView(CreateView {
285                        name,
286                        input,
287                        or_replace,
288                        definition,
289                        temporary,
290                    }) => input.map_elements(f)?.update_data(|input| {
291                        DdlStatement::CreateView(CreateView {
292                            name,
293                            input,
294                            or_replace,
295                            definition,
296                            temporary,
297                        })
298                    }),
299                    // no inputs in these statements
300                    DdlStatement::CreateExternalTable(_)
301                    | DdlStatement::CreateCatalogSchema(_)
302                    | DdlStatement::CreateCatalog(_)
303                    | DdlStatement::CreateIndex(_)
304                    | DdlStatement::DropTable(_)
305                    | DdlStatement::DropView(_)
306                    | DdlStatement::DropCatalogSchema(_)
307                    | DdlStatement::CreateFunction(_)
308                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
309                }
310                .update_data(LogicalPlan::Ddl)
311            }
312            LogicalPlan::Unnest(Unnest {
313                input,
314                exec_columns: input_columns,
315                list_type_columns,
316                struct_type_columns,
317                dependency_indices,
318                schema,
319                options,
320            }) => input.map_elements(f)?.update_data(|input| {
321                LogicalPlan::Unnest(Unnest {
322                    input,
323                    exec_columns: input_columns,
324                    dependency_indices,
325                    list_type_columns,
326                    struct_type_columns,
327                    schema,
328                    options,
329                })
330            }),
331            LogicalPlan::RecursiveQuery(RecursiveQuery {
332                name,
333                static_term,
334                recursive_term,
335                is_distinct,
336            }) => (static_term, recursive_term).map_elements(f)?.update_data(
337                |(static_term, recursive_term)| {
338                    LogicalPlan::RecursiveQuery(RecursiveQuery {
339                        name,
340                        static_term,
341                        recursive_term,
342                        is_distinct,
343                    })
344                },
345            ),
346            LogicalPlan::Statement(stmt) => match stmt {
347                Statement::Prepare(p) => p
348                    .input
349                    .map_elements(f)?
350                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
351                _ => Transformed::no(stmt),
352            }
353            .update_data(LogicalPlan::Statement),
354            // plans without inputs
355            LogicalPlan::TableScan { .. }
356            | LogicalPlan::EmptyRelation { .. }
357            | LogicalPlan::Values { .. }
358            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
359        })
360    }
361}
362
363/// Rewrites all inputs for an Extension node "in place"
364/// (it currently has to copy values because there are no APIs for in place modification)
365///
366/// Should be removed when we have an API for in place modifications of the
367/// extension to avoid these copies
368fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
369    extension: Extension,
370    f: F,
371) -> Result<Transformed<Extension>> {
372    let Extension { node } = extension;
373
374    node.inputs()
375        .into_iter()
376        .cloned()
377        .map_until_stop_and_collect(f)?
378        .map_data(|new_inputs| {
379            let exprs = node.expressions();
380            Ok(Extension {
381                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
382            })
383        })
384}
385
386/// This macro is used to determine continuation during combined transforming
387/// traversals.
388macro_rules! handle_transform_recursion {
389    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
390        $F_DOWN?
391            .transform_children(|n| {
392                n.map_subqueries($F_CHILD)?
393                    .transform_sibling(|n| n.map_children($F_CHILD))
394            })?
395            .transform_parent($F_UP)
396    }};
397}
398
399impl LogicalPlan {
400    /// Calls `f` on all expressions in the current `LogicalPlan` node.
401    ///
402    /// # Notes
403    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
404    /// * Does not include expressions in input `LogicalPlan` nodes
405    /// * Visits only the top level expressions (Does not recurse into each expression)
406    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
407        &self,
408        mut f: F,
409    ) -> Result<TreeNodeRecursion> {
410        match self {
411            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
412            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
413            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
414            LogicalPlan::Repartition(Repartition {
415                partitioning_scheme,
416                ..
417            }) => match partitioning_scheme {
418                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
419                    expr.apply_elements(f)
420                }
421                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
422            },
423            LogicalPlan::Window(Window { window_expr, .. }) => {
424                window_expr.apply_elements(f)
425            }
426            LogicalPlan::Aggregate(Aggregate {
427                group_expr,
428                aggr_expr,
429                ..
430            }) => (group_expr, aggr_expr).apply_ref_elements(f),
431            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
432            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
433            // 2. the second part is non-equijoin(filter).
434            LogicalPlan::Join(Join { on, filter, .. }) => {
435                (on, filter).apply_ref_elements(f)
436            }
437            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
438            LogicalPlan::Extension(extension) => {
439                // would be nice to avoid this copy -- maybe can
440                // update extension to just observer Exprs
441                extension.node.expressions().apply_elements(f)
442            }
443            LogicalPlan::TableScan(TableScan { filters, .. }) => {
444                filters.apply_elements(f)
445            }
446            LogicalPlan::Unnest(unnest) => {
447                let columns = unnest.exec_columns.clone();
448
449                let exprs = columns
450                    .iter()
451                    .map(|c| Expr::Column(c.clone()))
452                    .collect::<Vec<_>>();
453                exprs.apply_elements(f)
454            }
455            LogicalPlan::Distinct(Distinct::On(DistinctOn {
456                on_expr,
457                select_expr,
458                sort_expr,
459                ..
460            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
461            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
462                (skip, fetch).apply_ref_elements(f)
463            }
464            LogicalPlan::Statement(stmt) => match stmt {
465                Statement::Execute(Execute { parameters, .. }) => {
466                    parameters.apply_elements(f)
467                }
468                _ => Ok(TreeNodeRecursion::Continue),
469            },
470            // plans without expressions
471            LogicalPlan::EmptyRelation(_)
472            | LogicalPlan::RecursiveQuery(_)
473            | LogicalPlan::Subquery(_)
474            | LogicalPlan::SubqueryAlias(_)
475            | LogicalPlan::Analyze(_)
476            | LogicalPlan::Explain(_)
477            | LogicalPlan::Union(_)
478            | LogicalPlan::Distinct(Distinct::All(_))
479            | LogicalPlan::Dml(_)
480            | LogicalPlan::Ddl(_)
481            | LogicalPlan::Copy(_)
482            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
483        }
484    }
485
486    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
487    ///
488    /// Returns the current node.
489    ///
490    /// # Notes
491    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
492    /// * Visits only the top level expressions (Does not recurse into each expression)
493    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
494        self,
495        mut f: F,
496    ) -> Result<Transformed<Self>> {
497        Ok(match self {
498            LogicalPlan::Projection(Projection {
499                expr,
500                input,
501                schema,
502            }) => expr.map_elements(f)?.update_data(|expr| {
503                LogicalPlan::Projection(Projection {
504                    expr,
505                    input,
506                    schema,
507                })
508            }),
509            LogicalPlan::Values(Values { schema, values }) => values
510                .map_elements(f)?
511                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
512            LogicalPlan::Filter(Filter {
513                predicate,
514                input,
515                having,
516            }) => f(predicate)?.update_data(|predicate| {
517                LogicalPlan::Filter(Filter {
518                    predicate,
519                    input,
520                    having,
521                })
522            }),
523            LogicalPlan::Repartition(Repartition {
524                input,
525                partitioning_scheme,
526            }) => match partitioning_scheme {
527                Partitioning::Hash(expr, usize) => expr
528                    .map_elements(f)?
529                    .update_data(|expr| Partitioning::Hash(expr, usize)),
530                Partitioning::DistributeBy(expr) => expr
531                    .map_elements(f)?
532                    .update_data(Partitioning::DistributeBy),
533                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
534            }
535            .update_data(|partitioning_scheme| {
536                LogicalPlan::Repartition(Repartition {
537                    input,
538                    partitioning_scheme,
539                })
540            }),
541            LogicalPlan::Window(Window {
542                input,
543                window_expr,
544                schema,
545            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
546                LogicalPlan::Window(Window {
547                    input,
548                    window_expr,
549                    schema,
550                })
551            }),
552            LogicalPlan::Aggregate(Aggregate {
553                input,
554                group_expr,
555                aggr_expr,
556                schema,
557            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
558                |(group_expr, aggr_expr)| {
559                    LogicalPlan::Aggregate(Aggregate {
560                        input,
561                        group_expr,
562                        aggr_expr,
563                        schema,
564                    })
565                },
566            ),
567
568            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
569            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
570            // 2. the second part is non-equijoin(filter).
571            LogicalPlan::Join(Join {
572                left,
573                right,
574                on,
575                filter,
576                join_type,
577                join_constraint,
578                schema,
579                null_equals_null,
580            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
581                LogicalPlan::Join(Join {
582                    left,
583                    right,
584                    on,
585                    filter,
586                    join_type,
587                    join_constraint,
588                    schema,
589                    null_equals_null,
590                })
591            }),
592            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
593                .map_elements(f)?
594                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
595            LogicalPlan::Extension(Extension { node }) => {
596                // would be nice to avoid this copy -- maybe can
597                // update extension to just observer Exprs
598                let exprs = node.expressions().map_elements(f)?;
599                let plan = LogicalPlan::Extension(Extension {
600                    node: UserDefinedLogicalNode::with_exprs_and_inputs(
601                        node.as_ref(),
602                        exprs.data,
603                        node.inputs().into_iter().cloned().collect::<Vec<_>>(),
604                    )?,
605                });
606                Transformed::new(plan, exprs.transformed, exprs.tnr)
607            }
608            LogicalPlan::TableScan(TableScan {
609                table_name,
610                source,
611                projection,
612                projected_schema,
613                filters,
614                fetch,
615            }) => filters.map_elements(f)?.update_data(|filters| {
616                LogicalPlan::TableScan(TableScan {
617                    table_name,
618                    source,
619                    projection,
620                    projected_schema,
621                    filters,
622                    fetch,
623                })
624            }),
625            LogicalPlan::Distinct(Distinct::On(DistinctOn {
626                on_expr,
627                select_expr,
628                sort_expr,
629                input,
630                schema,
631            })) => (on_expr, select_expr, sort_expr)
632                .map_elements(f)?
633                .update_data(|(on_expr, select_expr, sort_expr)| {
634                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
635                        on_expr,
636                        select_expr,
637                        sort_expr,
638                        input,
639                        schema,
640                    }))
641                }),
642            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
643                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
644                    LogicalPlan::Limit(Limit { skip, fetch, input })
645                })
646            }
647            LogicalPlan::Statement(stmt) => match stmt {
648                Statement::Execute(e) => {
649                    e.parameters.map_elements(f)?.update_data(|parameters| {
650                        Statement::Execute(Execute { parameters, ..e })
651                    })
652                }
653                _ => Transformed::no(stmt),
654            }
655            .update_data(LogicalPlan::Statement),
656            // plans without expressions
657            LogicalPlan::EmptyRelation(_)
658            | LogicalPlan::Unnest(_)
659            | LogicalPlan::RecursiveQuery(_)
660            | LogicalPlan::Subquery(_)
661            | LogicalPlan::SubqueryAlias(_)
662            | LogicalPlan::Analyze(_)
663            | LogicalPlan::Explain(_)
664            | LogicalPlan::Union(_)
665            | LogicalPlan::Distinct(Distinct::All(_))
666            | LogicalPlan::Dml(_)
667            | LogicalPlan::Ddl(_)
668            | LogicalPlan::Copy(_)
669            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
670        })
671    }
672
673    /// Visits a plan similarly to [`Self::visit`], including subqueries that
674    /// may appear in expressions such as `IN (SELECT ...)`.
675    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
676    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
677        &self,
678        visitor: &mut V,
679    ) -> Result<TreeNodeRecursion> {
680        visitor
681            .f_down(self)?
682            .visit_children(|| {
683                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
684                    .visit_sibling(|| {
685                        self.apply_children(|c| c.visit_with_subqueries(visitor))
686                    })
687            })?
688            .visit_parent(|| visitor.f_up(self))
689    }
690
691    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
692    /// including subqueries that may appear in expressions such as `IN (SELECT
693    /// ...)`.
694    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
695    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
696        self,
697        rewriter: &mut R,
698    ) -> Result<Transformed<Self>> {
699        handle_transform_recursion!(
700            rewriter.f_down(self),
701            |c| c.rewrite_with_subqueries(rewriter),
702            |n| rewriter.f_up(n)
703        )
704    }
705
706    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
707    /// including subqueries that may appear in expressions such as `IN (SELECT
708    /// ...)`.
709    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
710        &self,
711        mut f: F,
712    ) -> Result<TreeNodeRecursion> {
713        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
714        fn apply_with_subqueries_impl<
715            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
716        >(
717            node: &LogicalPlan,
718            f: &mut F,
719        ) -> Result<TreeNodeRecursion> {
720            f(node)?.visit_children(|| {
721                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
722                    .visit_sibling(|| {
723                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
724                    })
725            })
726        }
727
728        apply_with_subqueries_impl(self, &mut f)
729    }
730
731    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
732    /// including subqueries that may appear in expressions such as `IN (SELECT
733    /// ...)`.
734    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
735        self,
736        f: F,
737    ) -> Result<Transformed<Self>> {
738        self.transform_up_with_subqueries(f)
739    }
740
741    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
742    /// including subqueries that may appear in expressions such as `IN (SELECT
743    /// ...)`.
744    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
745        self,
746        mut f: F,
747    ) -> Result<Transformed<Self>> {
748        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
749        fn transform_down_with_subqueries_impl<
750            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
751        >(
752            node: LogicalPlan,
753            f: &mut F,
754        ) -> Result<Transformed<LogicalPlan>> {
755            f(node)?.transform_children(|n| {
756                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
757                    .transform_sibling(|n| {
758                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
759                    })
760            })
761        }
762
763        transform_down_with_subqueries_impl(self, &mut f)
764    }
765
766    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
767    /// including subqueries that may appear in expressions such as `IN (SELECT
768    /// ...)`.
769    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
770        self,
771        mut f: F,
772    ) -> Result<Transformed<Self>> {
773        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
774        fn transform_up_with_subqueries_impl<
775            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
776        >(
777            node: LogicalPlan,
778            f: &mut F,
779        ) -> Result<Transformed<LogicalPlan>> {
780            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
781                .transform_sibling(|n| {
782                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
783                })?
784                .transform_parent(f)
785        }
786
787        transform_up_with_subqueries_impl(self, &mut f)
788    }
789
790    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
791    /// including subqueries that may appear in expressions such as `IN (SELECT
792    /// ...)`.
793    pub fn transform_down_up_with_subqueries<
794        FD: FnMut(Self) -> Result<Transformed<Self>>,
795        FU: FnMut(Self) -> Result<Transformed<Self>>,
796    >(
797        self,
798        mut f_down: FD,
799        mut f_up: FU,
800    ) -> Result<Transformed<Self>> {
801        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
802        fn transform_down_up_with_subqueries_impl<
803            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
804            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
805        >(
806            node: LogicalPlan,
807            f_down: &mut FD,
808            f_up: &mut FU,
809        ) -> Result<Transformed<LogicalPlan>> {
810            handle_transform_recursion!(
811                f_down(node),
812                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
813                f_up
814            )
815        }
816
817        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
818    }
819
820    /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
821    /// including subqueries that may appear in expressions such as `IN (SELECT
822    /// ...)`.
823    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
824        &self,
825        mut f: F,
826    ) -> Result<TreeNodeRecursion> {
827        self.apply_expressions(|expr| {
828            expr.apply(|expr| match expr {
829                Expr::Exists(Exists { subquery, .. })
830                | Expr::InSubquery(InSubquery { subquery, .. })
831                | Expr::ScalarSubquery(subquery) => {
832                    // use a synthetic plan so the collector sees a
833                    // LogicalPlan::Subquery (even though it is
834                    // actually a Subquery alias)
835                    f(&LogicalPlan::Subquery(subquery.clone()))
836                }
837                _ => Ok(TreeNodeRecursion::Continue),
838            })
839        })
840    }
841
842    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
843    /// appear in expressions such as `IN (SELECT ...)` using `f`.
844    ///
845    /// Returns the current node.
846    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
847        self,
848        mut f: F,
849    ) -> Result<Transformed<Self>> {
850        self.map_expressions(|expr| {
851            expr.transform_down(|expr| match expr {
852                Expr::Exists(Exists { subquery, negated }) => {
853                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
854                        LogicalPlan::Subquery(subquery) => {
855                            Ok(Expr::Exists(Exists { subquery, negated }))
856                        }
857                        _ => internal_err!("Transformation should return Subquery"),
858                    })
859                }
860                Expr::InSubquery(InSubquery {
861                    expr,
862                    subquery,
863                    negated,
864                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
865                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
866                        expr,
867                        subquery,
868                        negated,
869                    })),
870                    _ => internal_err!("Transformation should return Subquery"),
871                }),
872                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
873                    .map_data(|s| match s {
874                        LogicalPlan::Subquery(subquery) => {
875                            Ok(Expr::ScalarSubquery(subquery))
876                        }
877                        _ => internal_err!("Transformation should return Subquery"),
878                    }),
879                _ => Ok(Transformed::no(expr)),
880            })
881        })
882    }
883}