datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42    Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43    Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44    Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45    UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter { predicate, input }) => input
89                .map_elements(f)?
90                .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91            LogicalPlan::Repartition(Repartition {
92                input,
93                partitioning_scheme,
94            }) => input.map_elements(f)?.update_data(|input| {
95                LogicalPlan::Repartition(Repartition {
96                    input,
97                    partitioning_scheme,
98                })
99            }),
100            LogicalPlan::Window(Window {
101                input,
102                window_expr,
103                schema,
104            }) => input.map_elements(f)?.update_data(|input| {
105                LogicalPlan::Window(Window {
106                    input,
107                    window_expr,
108                    schema,
109                })
110            }),
111            LogicalPlan::Aggregate(Aggregate {
112                input,
113                group_expr,
114                aggr_expr,
115                schema,
116            }) => input.map_elements(f)?.update_data(|input| {
117                LogicalPlan::Aggregate(Aggregate {
118                    input,
119                    group_expr,
120                    aggr_expr,
121                    schema,
122                })
123            }),
124            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125                .map_elements(f)?
126                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127            LogicalPlan::Join(Join {
128                left,
129                right,
130                on,
131                filter,
132                join_type,
133                join_constraint,
134                schema,
135                null_equality,
136            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
137                LogicalPlan::Join(Join {
138                    left,
139                    right,
140                    on,
141                    filter,
142                    join_type,
143                    join_constraint,
144                    schema,
145                    null_equality,
146                })
147            }),
148            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
149                .map_elements(f)?
150                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
151            LogicalPlan::Subquery(Subquery {
152                subquery,
153                outer_ref_columns,
154                spans,
155            }) => subquery.map_elements(f)?.update_data(|subquery| {
156                LogicalPlan::Subquery(Subquery {
157                    subquery,
158                    outer_ref_columns,
159                    spans,
160                })
161            }),
162            LogicalPlan::SubqueryAlias(SubqueryAlias {
163                input,
164                alias,
165                schema,
166            }) => input.map_elements(f)?.update_data(|input| {
167                LogicalPlan::SubqueryAlias(SubqueryAlias {
168                    input,
169                    alias,
170                    schema,
171                })
172            }),
173            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
174                .update_data(LogicalPlan::Extension),
175            LogicalPlan::Union(Union { inputs, schema }) => inputs
176                .map_elements(f)?
177                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
178            LogicalPlan::Distinct(distinct) => match distinct {
179                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
180                Distinct::On(DistinctOn {
181                    on_expr,
182                    select_expr,
183                    sort_expr,
184                    input,
185                    schema,
186                }) => input.map_elements(f)?.update_data(|input| {
187                    Distinct::On(DistinctOn {
188                        on_expr,
189                        select_expr,
190                        sort_expr,
191                        input,
192                        schema,
193                    })
194                }),
195            }
196            .update_data(LogicalPlan::Distinct),
197            LogicalPlan::Explain(Explain {
198                verbose,
199                explain_format: format,
200                plan,
201                stringified_plans,
202                schema,
203                logical_optimization_succeeded,
204            }) => plan.map_elements(f)?.update_data(|plan| {
205                LogicalPlan::Explain(Explain {
206                    verbose,
207                    explain_format: format,
208                    plan,
209                    stringified_plans,
210                    schema,
211                    logical_optimization_succeeded,
212                })
213            }),
214            LogicalPlan::Analyze(Analyze {
215                verbose,
216                input,
217                schema,
218            }) => input.map_elements(f)?.update_data(|input| {
219                LogicalPlan::Analyze(Analyze {
220                    verbose,
221                    input,
222                    schema,
223                })
224            }),
225            LogicalPlan::Dml(DmlStatement {
226                table_name,
227                target,
228                op,
229                input,
230                output_schema,
231            }) => input.map_elements(f)?.update_data(|input| {
232                LogicalPlan::Dml(DmlStatement {
233                    table_name,
234                    target,
235                    op,
236                    input,
237                    output_schema,
238                })
239            }),
240            LogicalPlan::Copy(CopyTo {
241                input,
242                output_url,
243                partition_by,
244                file_type,
245                options,
246                output_schema,
247            }) => input.map_elements(f)?.update_data(|input| {
248                LogicalPlan::Copy(CopyTo {
249                    input,
250                    output_url,
251                    partition_by,
252                    file_type,
253                    options,
254                    output_schema,
255                })
256            }),
257            LogicalPlan::Ddl(ddl) => {
258                match ddl {
259                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
260                        name,
261                        constraints,
262                        input,
263                        if_not_exists,
264                        or_replace,
265                        column_defaults,
266                        temporary,
267                    }) => input.map_elements(f)?.update_data(|input| {
268                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
269                            name,
270                            constraints,
271                            input,
272                            if_not_exists,
273                            or_replace,
274                            column_defaults,
275                            temporary,
276                        })
277                    }),
278                    DdlStatement::CreateView(CreateView {
279                        name,
280                        input,
281                        or_replace,
282                        definition,
283                        temporary,
284                    }) => input.map_elements(f)?.update_data(|input| {
285                        DdlStatement::CreateView(CreateView {
286                            name,
287                            input,
288                            or_replace,
289                            definition,
290                            temporary,
291                        })
292                    }),
293                    // no inputs in these statements
294                    DdlStatement::CreateExternalTable(_)
295                    | DdlStatement::CreateCatalogSchema(_)
296                    | DdlStatement::CreateCatalog(_)
297                    | DdlStatement::CreateIndex(_)
298                    | DdlStatement::DropTable(_)
299                    | DdlStatement::DropView(_)
300                    | DdlStatement::DropCatalogSchema(_)
301                    | DdlStatement::CreateFunction(_)
302                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
303                }
304                .update_data(LogicalPlan::Ddl)
305            }
306            LogicalPlan::Unnest(Unnest {
307                input,
308                exec_columns: input_columns,
309                list_type_columns,
310                struct_type_columns,
311                dependency_indices,
312                schema,
313                options,
314            }) => input.map_elements(f)?.update_data(|input| {
315                LogicalPlan::Unnest(Unnest {
316                    input,
317                    exec_columns: input_columns,
318                    list_type_columns,
319                    struct_type_columns,
320                    dependency_indices,
321                    schema,
322                    options,
323                })
324            }),
325            LogicalPlan::RecursiveQuery(RecursiveQuery {
326                name,
327                static_term,
328                recursive_term,
329                is_distinct,
330            }) => (static_term, recursive_term).map_elements(f)?.update_data(
331                |(static_term, recursive_term)| {
332                    LogicalPlan::RecursiveQuery(RecursiveQuery {
333                        name,
334                        static_term,
335                        recursive_term,
336                        is_distinct,
337                    })
338                },
339            ),
340            LogicalPlan::Statement(stmt) => match stmt {
341                Statement::Prepare(p) => p
342                    .input
343                    .map_elements(f)?
344                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
345                _ => Transformed::no(stmt),
346            }
347            .update_data(LogicalPlan::Statement),
348            // plans without inputs
349            LogicalPlan::TableScan { .. }
350            | LogicalPlan::EmptyRelation { .. }
351            | LogicalPlan::Values { .. }
352            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
353        })
354    }
355}
356
357/// Rewrites all inputs for an Extension node "in place"
358/// (it currently has to copy values because there are no APIs for in place modification)
359///
360/// Should be removed when we have an API for in place modifications of the
361/// extension to avoid these copies
362fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
363    extension: Extension,
364    f: F,
365) -> Result<Transformed<Extension>> {
366    let Extension { node } = extension;
367
368    node.inputs()
369        .into_iter()
370        .cloned()
371        .map_until_stop_and_collect(f)?
372        .map_data(|new_inputs| {
373            let exprs = node.expressions();
374            Ok(Extension {
375                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
376            })
377        })
378}
379
380/// This macro is used to determine continuation during combined transforming
381/// traversals.
382macro_rules! handle_transform_recursion {
383    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
384        $F_DOWN?
385            .transform_children(|n| {
386                n.map_subqueries($F_CHILD)?
387                    .transform_sibling(|n| n.map_children($F_CHILD))
388            })?
389            .transform_parent($F_UP)
390    }};
391}
392
393impl LogicalPlan {
394    /// Calls `f` on all expressions in the current `LogicalPlan` node.
395    ///
396    /// # Notes
397    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
398    /// * Does not include expressions in input `LogicalPlan` nodes
399    /// * Visits only the top level expressions (Does not recurse into each expression)
400    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
401        &self,
402        mut f: F,
403    ) -> Result<TreeNodeRecursion> {
404        match self {
405            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
406            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
407            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
408            LogicalPlan::Repartition(Repartition {
409                partitioning_scheme,
410                ..
411            }) => match partitioning_scheme {
412                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
413                    expr.apply_elements(f)
414                }
415                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
416            },
417            LogicalPlan::Window(Window { window_expr, .. }) => {
418                window_expr.apply_elements(f)
419            }
420            LogicalPlan::Aggregate(Aggregate {
421                group_expr,
422                aggr_expr,
423                ..
424            }) => (group_expr, aggr_expr).apply_ref_elements(f),
425            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
426            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
427            // 2. the second part is non-equijoin(filter).
428            LogicalPlan::Join(Join { on, filter, .. }) => {
429                (on, filter).apply_ref_elements(f)
430            }
431            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
432            LogicalPlan::Extension(extension) => {
433                // would be nice to avoid this copy -- maybe can
434                // update extension to just observer Exprs
435                extension.node.expressions().apply_elements(f)
436            }
437            LogicalPlan::TableScan(TableScan { filters, .. }) => {
438                filters.apply_elements(f)
439            }
440            LogicalPlan::Unnest(unnest) => {
441                let exprs = unnest
442                    .exec_columns
443                    .iter()
444                    .cloned()
445                    .map(Expr::Column)
446                    .collect::<Vec<_>>();
447                exprs.apply_elements(f)
448            }
449            LogicalPlan::Distinct(Distinct::On(DistinctOn {
450                on_expr,
451                select_expr,
452                sort_expr,
453                ..
454            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
455            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
456                (skip, fetch).apply_ref_elements(f)
457            }
458            LogicalPlan::Statement(stmt) => match stmt {
459                Statement::Execute(Execute { parameters, .. }) => {
460                    parameters.apply_elements(f)
461                }
462                _ => Ok(TreeNodeRecursion::Continue),
463            },
464            // plans without expressions
465            LogicalPlan::EmptyRelation(_)
466            | LogicalPlan::RecursiveQuery(_)
467            | LogicalPlan::Subquery(_)
468            | LogicalPlan::SubqueryAlias(_)
469            | LogicalPlan::Analyze(_)
470            | LogicalPlan::Explain(_)
471            | LogicalPlan::Union(_)
472            | LogicalPlan::Distinct(Distinct::All(_))
473            | LogicalPlan::Dml(_)
474            | LogicalPlan::Ddl(_)
475            | LogicalPlan::Copy(_)
476            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
477        }
478    }
479
480    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
481    ///
482    /// Returns the current node.
483    ///
484    /// # Notes
485    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
486    /// * Visits only the top level expressions (Does not recurse into each expression)
487    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
488        self,
489        mut f: F,
490    ) -> Result<Transformed<Self>> {
491        Ok(match self {
492            LogicalPlan::Projection(Projection {
493                expr,
494                input,
495                schema,
496            }) => expr.map_elements(f)?.update_data(|expr| {
497                LogicalPlan::Projection(Projection {
498                    expr,
499                    input,
500                    schema,
501                })
502            }),
503            LogicalPlan::Values(Values { schema, values }) => values
504                .map_elements(f)?
505                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
506            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
507                .update_data(|predicate| {
508                    LogicalPlan::Filter(Filter { predicate, input })
509                }),
510            LogicalPlan::Repartition(Repartition {
511                input,
512                partitioning_scheme,
513            }) => match partitioning_scheme {
514                Partitioning::Hash(expr, usize) => expr
515                    .map_elements(f)?
516                    .update_data(|expr| Partitioning::Hash(expr, usize)),
517                Partitioning::DistributeBy(expr) => expr
518                    .map_elements(f)?
519                    .update_data(Partitioning::DistributeBy),
520                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
521            }
522            .update_data(|partitioning_scheme| {
523                LogicalPlan::Repartition(Repartition {
524                    input,
525                    partitioning_scheme,
526                })
527            }),
528            LogicalPlan::Window(Window {
529                input,
530                window_expr,
531                schema,
532            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
533                LogicalPlan::Window(Window {
534                    input,
535                    window_expr,
536                    schema,
537                })
538            }),
539            LogicalPlan::Aggregate(Aggregate {
540                input,
541                group_expr,
542                aggr_expr,
543                schema,
544            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
545                |(group_expr, aggr_expr)| {
546                    LogicalPlan::Aggregate(Aggregate {
547                        input,
548                        group_expr,
549                        aggr_expr,
550                        schema,
551                    })
552                },
553            ),
554
555            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
556            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
557            // 2. the second part is non-equijoin(filter).
558            LogicalPlan::Join(Join {
559                left,
560                right,
561                on,
562                filter,
563                join_type,
564                join_constraint,
565                schema,
566                null_equality,
567            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
568                LogicalPlan::Join(Join {
569                    left,
570                    right,
571                    on,
572                    filter,
573                    join_type,
574                    join_constraint,
575                    schema,
576                    null_equality,
577                })
578            }),
579            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
580                .map_elements(f)?
581                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
582            LogicalPlan::Extension(Extension { node }) => {
583                // would be nice to avoid this copy -- maybe can
584                // update extension to just observer Exprs
585                let exprs = node.expressions().map_elements(f)?;
586                let plan = LogicalPlan::Extension(Extension {
587                    node: UserDefinedLogicalNode::with_exprs_and_inputs(
588                        node.as_ref(),
589                        exprs.data,
590                        node.inputs().into_iter().cloned().collect::<Vec<_>>(),
591                    )?,
592                });
593                Transformed::new(plan, exprs.transformed, exprs.tnr)
594            }
595            LogicalPlan::TableScan(TableScan {
596                table_name,
597                source,
598                projection,
599                projected_schema,
600                filters,
601                fetch,
602            }) => filters.map_elements(f)?.update_data(|filters| {
603                LogicalPlan::TableScan(TableScan {
604                    table_name,
605                    source,
606                    projection,
607                    projected_schema,
608                    filters,
609                    fetch,
610                })
611            }),
612            LogicalPlan::Distinct(Distinct::On(DistinctOn {
613                on_expr,
614                select_expr,
615                sort_expr,
616                input,
617                schema,
618            })) => (on_expr, select_expr, sort_expr)
619                .map_elements(f)?
620                .update_data(|(on_expr, select_expr, sort_expr)| {
621                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
622                        on_expr,
623                        select_expr,
624                        sort_expr,
625                        input,
626                        schema,
627                    }))
628                }),
629            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
630                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
631                    LogicalPlan::Limit(Limit { skip, fetch, input })
632                })
633            }
634            LogicalPlan::Statement(stmt) => match stmt {
635                Statement::Execute(e) => {
636                    e.parameters.map_elements(f)?.update_data(|parameters| {
637                        Statement::Execute(Execute { parameters, ..e })
638                    })
639                }
640                _ => Transformed::no(stmt),
641            }
642            .update_data(LogicalPlan::Statement),
643            // plans without expressions
644            LogicalPlan::EmptyRelation(_)
645            | LogicalPlan::Unnest(_)
646            | LogicalPlan::RecursiveQuery(_)
647            | LogicalPlan::Subquery(_)
648            | LogicalPlan::SubqueryAlias(_)
649            | LogicalPlan::Analyze(_)
650            | LogicalPlan::Explain(_)
651            | LogicalPlan::Union(_)
652            | LogicalPlan::Distinct(Distinct::All(_))
653            | LogicalPlan::Dml(_)
654            | LogicalPlan::Ddl(_)
655            | LogicalPlan::Copy(_)
656            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
657        })
658    }
659
660    /// Visits a plan similarly to [`Self::visit`], including subqueries that
661    /// may appear in expressions such as `IN (SELECT ...)`.
662    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
663    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
664        &self,
665        visitor: &mut V,
666    ) -> Result<TreeNodeRecursion> {
667        visitor
668            .f_down(self)?
669            .visit_children(|| {
670                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
671                    .visit_sibling(|| {
672                        self.apply_children(|c| c.visit_with_subqueries(visitor))
673                    })
674            })?
675            .visit_parent(|| visitor.f_up(self))
676    }
677
678    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
679    /// including subqueries that may appear in expressions such as `IN (SELECT
680    /// ...)`.
681    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
682    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
683        self,
684        rewriter: &mut R,
685    ) -> Result<Transformed<Self>> {
686        handle_transform_recursion!(
687            rewriter.f_down(self),
688            |c| c.rewrite_with_subqueries(rewriter),
689            |n| rewriter.f_up(n)
690        )
691    }
692
693    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
694    /// including subqueries that may appear in expressions such as `IN (SELECT
695    /// ...)`.
696    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
697        &self,
698        mut f: F,
699    ) -> Result<TreeNodeRecursion> {
700        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
701        fn apply_with_subqueries_impl<
702            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
703        >(
704            node: &LogicalPlan,
705            f: &mut F,
706        ) -> Result<TreeNodeRecursion> {
707            f(node)?.visit_children(|| {
708                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
709                    .visit_sibling(|| {
710                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
711                    })
712            })
713        }
714
715        apply_with_subqueries_impl(self, &mut f)
716    }
717
718    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
719    /// including subqueries that may appear in expressions such as `IN (SELECT
720    /// ...)`.
721    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
722        self,
723        f: F,
724    ) -> Result<Transformed<Self>> {
725        self.transform_up_with_subqueries(f)
726    }
727
728    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
729    /// including subqueries that may appear in expressions such as `IN (SELECT
730    /// ...)`.
731    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
732        self,
733        mut f: F,
734    ) -> Result<Transformed<Self>> {
735        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
736        fn transform_down_with_subqueries_impl<
737            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
738        >(
739            node: LogicalPlan,
740            f: &mut F,
741        ) -> Result<Transformed<LogicalPlan>> {
742            f(node)?.transform_children(|n| {
743                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
744                    .transform_sibling(|n| {
745                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
746                    })
747            })
748        }
749
750        transform_down_with_subqueries_impl(self, &mut f)
751    }
752
753    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
754    /// including subqueries that may appear in expressions such as `IN (SELECT
755    /// ...)`.
756    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
757        self,
758        mut f: F,
759    ) -> Result<Transformed<Self>> {
760        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
761        fn transform_up_with_subqueries_impl<
762            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
763        >(
764            node: LogicalPlan,
765            f: &mut F,
766        ) -> Result<Transformed<LogicalPlan>> {
767            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
768                .transform_sibling(|n| {
769                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
770                })?
771                .transform_parent(f)
772        }
773
774        transform_up_with_subqueries_impl(self, &mut f)
775    }
776
777    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
778    /// including subqueries that may appear in expressions such as `IN (SELECT
779    /// ...)`.
780    pub fn transform_down_up_with_subqueries<
781        FD: FnMut(Self) -> Result<Transformed<Self>>,
782        FU: FnMut(Self) -> Result<Transformed<Self>>,
783    >(
784        self,
785        mut f_down: FD,
786        mut f_up: FU,
787    ) -> Result<Transformed<Self>> {
788        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
789        fn transform_down_up_with_subqueries_impl<
790            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
791            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
792        >(
793            node: LogicalPlan,
794            f_down: &mut FD,
795            f_up: &mut FU,
796        ) -> Result<Transformed<LogicalPlan>> {
797            handle_transform_recursion!(
798                f_down(node),
799                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
800                f_up
801            )
802        }
803
804        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
805    }
806
807    /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
808    /// including subqueries that may appear in expressions such as `IN (SELECT
809    /// ...)`.
810    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
811        &self,
812        mut f: F,
813    ) -> Result<TreeNodeRecursion> {
814        self.apply_expressions(|expr| {
815            expr.apply(|expr| match expr {
816                Expr::Exists(Exists { subquery, .. })
817                | Expr::InSubquery(InSubquery { subquery, .. })
818                | Expr::ScalarSubquery(subquery) => {
819                    // use a synthetic plan so the collector sees a
820                    // LogicalPlan::Subquery (even though it is
821                    // actually a Subquery alias)
822                    f(&LogicalPlan::Subquery(subquery.clone()))
823                }
824                _ => Ok(TreeNodeRecursion::Continue),
825            })
826        })
827    }
828
829    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
830    /// appear in expressions such as `IN (SELECT ...)` using `f`.
831    ///
832    /// Returns the current node.
833    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
834        self,
835        mut f: F,
836    ) -> Result<Transformed<Self>> {
837        self.map_expressions(|expr| {
838            expr.transform_down(|expr| match expr {
839                Expr::Exists(Exists { subquery, negated }) => {
840                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
841                        LogicalPlan::Subquery(subquery) => {
842                            Ok(Expr::Exists(Exists { subquery, negated }))
843                        }
844                        _ => internal_err!("Transformation should return Subquery"),
845                    })
846                }
847                Expr::InSubquery(InSubquery {
848                    expr,
849                    subquery,
850                    negated,
851                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
852                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
853                        expr,
854                        subquery,
855                        negated,
856                    })),
857                    _ => internal_err!("Transformation should return Subquery"),
858                }),
859                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
860                    .map_data(|s| match s {
861                        LogicalPlan::Subquery(subquery) => {
862                            Ok(Expr::ScalarSubquery(subquery))
863                        }
864                        _ => internal_err!("Transformation should return Subquery"),
865                    }),
866                _ => Ok(Transformed::no(expr)),
867            })
868        })
869    }
870}