datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42    Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43    Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44    Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45    UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter { predicate, input }) => input
89                .map_elements(f)?
90                .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91            LogicalPlan::Repartition(Repartition {
92                input,
93                partitioning_scheme,
94            }) => input.map_elements(f)?.update_data(|input| {
95                LogicalPlan::Repartition(Repartition {
96                    input,
97                    partitioning_scheme,
98                })
99            }),
100            LogicalPlan::Window(Window {
101                input,
102                window_expr,
103                schema,
104            }) => input.map_elements(f)?.update_data(|input| {
105                LogicalPlan::Window(Window {
106                    input,
107                    window_expr,
108                    schema,
109                })
110            }),
111            LogicalPlan::Aggregate(Aggregate {
112                input,
113                group_expr,
114                aggr_expr,
115                schema,
116            }) => input.map_elements(f)?.update_data(|input| {
117                LogicalPlan::Aggregate(Aggregate {
118                    input,
119                    group_expr,
120                    aggr_expr,
121                    schema,
122                })
123            }),
124            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125                .map_elements(f)?
126                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127            LogicalPlan::Join(Join {
128                left,
129                right,
130                on,
131                filter,
132                join_type,
133                join_constraint,
134                schema,
135                null_equals_null,
136            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
137                LogicalPlan::Join(Join {
138                    left,
139                    right,
140                    on,
141                    filter,
142                    join_type,
143                    join_constraint,
144                    schema,
145                    null_equals_null,
146                })
147            }),
148            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
149                .map_elements(f)?
150                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
151            LogicalPlan::Subquery(Subquery {
152                subquery,
153                outer_ref_columns,
154                spans,
155            }) => subquery.map_elements(f)?.update_data(|subquery| {
156                LogicalPlan::Subquery(Subquery {
157                    subquery,
158                    outer_ref_columns,
159                    spans,
160                })
161            }),
162            LogicalPlan::SubqueryAlias(SubqueryAlias {
163                input,
164                alias,
165                schema,
166            }) => input.map_elements(f)?.update_data(|input| {
167                LogicalPlan::SubqueryAlias(SubqueryAlias {
168                    input,
169                    alias,
170                    schema,
171                })
172            }),
173            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
174                .update_data(LogicalPlan::Extension),
175            LogicalPlan::Union(Union { inputs, schema }) => inputs
176                .map_elements(f)?
177                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
178            LogicalPlan::Distinct(distinct) => match distinct {
179                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
180                Distinct::On(DistinctOn {
181                    on_expr,
182                    select_expr,
183                    sort_expr,
184                    input,
185                    schema,
186                }) => input.map_elements(f)?.update_data(|input| {
187                    Distinct::On(DistinctOn {
188                        on_expr,
189                        select_expr,
190                        sort_expr,
191                        input,
192                        schema,
193                    })
194                }),
195            }
196            .update_data(LogicalPlan::Distinct),
197            LogicalPlan::Explain(Explain {
198                verbose,
199                explain_format: format,
200                plan,
201                stringified_plans,
202                schema,
203                logical_optimization_succeeded,
204            }) => plan.map_elements(f)?.update_data(|plan| {
205                LogicalPlan::Explain(Explain {
206                    verbose,
207                    explain_format: format,
208                    plan,
209                    stringified_plans,
210                    schema,
211                    logical_optimization_succeeded,
212                })
213            }),
214            LogicalPlan::Analyze(Analyze {
215                verbose,
216                input,
217                schema,
218            }) => input.map_elements(f)?.update_data(|input| {
219                LogicalPlan::Analyze(Analyze {
220                    verbose,
221                    input,
222                    schema,
223                })
224            }),
225            LogicalPlan::Dml(DmlStatement {
226                table_name,
227                target,
228                op,
229                input,
230                output_schema,
231            }) => input.map_elements(f)?.update_data(|input| {
232                LogicalPlan::Dml(DmlStatement {
233                    table_name,
234                    target,
235                    op,
236                    input,
237                    output_schema,
238                })
239            }),
240            LogicalPlan::Copy(CopyTo {
241                input,
242                output_url,
243                partition_by,
244                file_type,
245                options,
246            }) => input.map_elements(f)?.update_data(|input| {
247                LogicalPlan::Copy(CopyTo {
248                    input,
249                    output_url,
250                    partition_by,
251                    file_type,
252                    options,
253                })
254            }),
255            LogicalPlan::Ddl(ddl) => {
256                match ddl {
257                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
258                        name,
259                        constraints,
260                        input,
261                        if_not_exists,
262                        or_replace,
263                        column_defaults,
264                        temporary,
265                    }) => input.map_elements(f)?.update_data(|input| {
266                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
267                            name,
268                            constraints,
269                            input,
270                            if_not_exists,
271                            or_replace,
272                            column_defaults,
273                            temporary,
274                        })
275                    }),
276                    DdlStatement::CreateView(CreateView {
277                        name,
278                        input,
279                        or_replace,
280                        definition,
281                        temporary,
282                    }) => input.map_elements(f)?.update_data(|input| {
283                        DdlStatement::CreateView(CreateView {
284                            name,
285                            input,
286                            or_replace,
287                            definition,
288                            temporary,
289                        })
290                    }),
291                    // no inputs in these statements
292                    DdlStatement::CreateExternalTable(_)
293                    | DdlStatement::CreateCatalogSchema(_)
294                    | DdlStatement::CreateCatalog(_)
295                    | DdlStatement::CreateIndex(_)
296                    | DdlStatement::DropTable(_)
297                    | DdlStatement::DropView(_)
298                    | DdlStatement::DropCatalogSchema(_)
299                    | DdlStatement::CreateFunction(_)
300                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
301                }
302                .update_data(LogicalPlan::Ddl)
303            }
304            LogicalPlan::Unnest(Unnest {
305                input,
306                exec_columns: input_columns,
307                list_type_columns,
308                struct_type_columns,
309                dependency_indices,
310                schema,
311                options,
312            }) => input.map_elements(f)?.update_data(|input| {
313                LogicalPlan::Unnest(Unnest {
314                    input,
315                    exec_columns: input_columns,
316                    dependency_indices,
317                    list_type_columns,
318                    struct_type_columns,
319                    schema,
320                    options,
321                })
322            }),
323            LogicalPlan::RecursiveQuery(RecursiveQuery {
324                name,
325                static_term,
326                recursive_term,
327                is_distinct,
328            }) => (static_term, recursive_term).map_elements(f)?.update_data(
329                |(static_term, recursive_term)| {
330                    LogicalPlan::RecursiveQuery(RecursiveQuery {
331                        name,
332                        static_term,
333                        recursive_term,
334                        is_distinct,
335                    })
336                },
337            ),
338            LogicalPlan::Statement(stmt) => match stmt {
339                Statement::Prepare(p) => p
340                    .input
341                    .map_elements(f)?
342                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
343                _ => Transformed::no(stmt),
344            }
345            .update_data(LogicalPlan::Statement),
346            // plans without inputs
347            LogicalPlan::TableScan { .. }
348            | LogicalPlan::EmptyRelation { .. }
349            | LogicalPlan::Values { .. }
350            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
351        })
352    }
353}
354
355/// Rewrites all inputs for an Extension node "in place"
356/// (it currently has to copy values because there are no APIs for in place modification)
357///
358/// Should be removed when we have an API for in place modifications of the
359/// extension to avoid these copies
360fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
361    extension: Extension,
362    f: F,
363) -> Result<Transformed<Extension>> {
364    let Extension { node } = extension;
365
366    node.inputs()
367        .into_iter()
368        .cloned()
369        .map_until_stop_and_collect(f)?
370        .map_data(|new_inputs| {
371            let exprs = node.expressions();
372            Ok(Extension {
373                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
374            })
375        })
376}
377
378/// This macro is used to determine continuation during combined transforming
379/// traversals.
380macro_rules! handle_transform_recursion {
381    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
382        $F_DOWN?
383            .transform_children(|n| {
384                n.map_subqueries($F_CHILD)?
385                    .transform_sibling(|n| n.map_children($F_CHILD))
386            })?
387            .transform_parent($F_UP)
388    }};
389}
390
391impl LogicalPlan {
392    /// Calls `f` on all expressions in the current `LogicalPlan` node.
393    ///
394    /// # Notes
395    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
396    /// * Does not include expressions in input `LogicalPlan` nodes
397    /// * Visits only the top level expressions (Does not recurse into each expression)
398    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
399        &self,
400        mut f: F,
401    ) -> Result<TreeNodeRecursion> {
402        match self {
403            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
404            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
405            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
406            LogicalPlan::Repartition(Repartition {
407                partitioning_scheme,
408                ..
409            }) => match partitioning_scheme {
410                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
411                    expr.apply_elements(f)
412                }
413                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
414            },
415            LogicalPlan::Window(Window { window_expr, .. }) => {
416                window_expr.apply_elements(f)
417            }
418            LogicalPlan::Aggregate(Aggregate {
419                group_expr,
420                aggr_expr,
421                ..
422            }) => (group_expr, aggr_expr).apply_ref_elements(f),
423            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
424            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
425            // 2. the second part is non-equijoin(filter).
426            LogicalPlan::Join(Join { on, filter, .. }) => {
427                (on, filter).apply_ref_elements(f)
428            }
429            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
430            LogicalPlan::Extension(extension) => {
431                // would be nice to avoid this copy -- maybe can
432                // update extension to just observer Exprs
433                extension.node.expressions().apply_elements(f)
434            }
435            LogicalPlan::TableScan(TableScan { filters, .. }) => {
436                filters.apply_elements(f)
437            }
438            LogicalPlan::Unnest(unnest) => {
439                let columns = unnest.exec_columns.clone();
440
441                let exprs = columns
442                    .iter()
443                    .map(|c| Expr::Column(c.clone()))
444                    .collect::<Vec<_>>();
445                exprs.apply_elements(f)
446            }
447            LogicalPlan::Distinct(Distinct::On(DistinctOn {
448                on_expr,
449                select_expr,
450                sort_expr,
451                ..
452            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
453            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
454                (skip, fetch).apply_ref_elements(f)
455            }
456            LogicalPlan::Statement(stmt) => match stmt {
457                Statement::Execute(Execute { parameters, .. }) => {
458                    parameters.apply_elements(f)
459                }
460                _ => Ok(TreeNodeRecursion::Continue),
461            },
462            // plans without expressions
463            LogicalPlan::EmptyRelation(_)
464            | LogicalPlan::RecursiveQuery(_)
465            | LogicalPlan::Subquery(_)
466            | LogicalPlan::SubqueryAlias(_)
467            | LogicalPlan::Analyze(_)
468            | LogicalPlan::Explain(_)
469            | LogicalPlan::Union(_)
470            | LogicalPlan::Distinct(Distinct::All(_))
471            | LogicalPlan::Dml(_)
472            | LogicalPlan::Ddl(_)
473            | LogicalPlan::Copy(_)
474            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
475        }
476    }
477
478    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
479    ///
480    /// Returns the current node.
481    ///
482    /// # Notes
483    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
484    /// * Visits only the top level expressions (Does not recurse into each expression)
485    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
486        self,
487        mut f: F,
488    ) -> Result<Transformed<Self>> {
489        Ok(match self {
490            LogicalPlan::Projection(Projection {
491                expr,
492                input,
493                schema,
494            }) => expr.map_elements(f)?.update_data(|expr| {
495                LogicalPlan::Projection(Projection {
496                    expr,
497                    input,
498                    schema,
499                })
500            }),
501            LogicalPlan::Values(Values { schema, values }) => values
502                .map_elements(f)?
503                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
504            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
505                .update_data(|predicate| {
506                    LogicalPlan::Filter(Filter { predicate, input })
507                }),
508            LogicalPlan::Repartition(Repartition {
509                input,
510                partitioning_scheme,
511            }) => match partitioning_scheme {
512                Partitioning::Hash(expr, usize) => expr
513                    .map_elements(f)?
514                    .update_data(|expr| Partitioning::Hash(expr, usize)),
515                Partitioning::DistributeBy(expr) => expr
516                    .map_elements(f)?
517                    .update_data(Partitioning::DistributeBy),
518                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
519            }
520            .update_data(|partitioning_scheme| {
521                LogicalPlan::Repartition(Repartition {
522                    input,
523                    partitioning_scheme,
524                })
525            }),
526            LogicalPlan::Window(Window {
527                input,
528                window_expr,
529                schema,
530            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
531                LogicalPlan::Window(Window {
532                    input,
533                    window_expr,
534                    schema,
535                })
536            }),
537            LogicalPlan::Aggregate(Aggregate {
538                input,
539                group_expr,
540                aggr_expr,
541                schema,
542            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
543                |(group_expr, aggr_expr)| {
544                    LogicalPlan::Aggregate(Aggregate {
545                        input,
546                        group_expr,
547                        aggr_expr,
548                        schema,
549                    })
550                },
551            ),
552
553            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
554            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
555            // 2. the second part is non-equijoin(filter).
556            LogicalPlan::Join(Join {
557                left,
558                right,
559                on,
560                filter,
561                join_type,
562                join_constraint,
563                schema,
564                null_equals_null,
565            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
566                LogicalPlan::Join(Join {
567                    left,
568                    right,
569                    on,
570                    filter,
571                    join_type,
572                    join_constraint,
573                    schema,
574                    null_equals_null,
575                })
576            }),
577            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
578                .map_elements(f)?
579                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
580            LogicalPlan::Extension(Extension { node }) => {
581                // would be nice to avoid this copy -- maybe can
582                // update extension to just observer Exprs
583                let exprs = node.expressions().map_elements(f)?;
584                let plan = LogicalPlan::Extension(Extension {
585                    node: UserDefinedLogicalNode::with_exprs_and_inputs(
586                        node.as_ref(),
587                        exprs.data,
588                        node.inputs().into_iter().cloned().collect::<Vec<_>>(),
589                    )?,
590                });
591                Transformed::new(plan, exprs.transformed, exprs.tnr)
592            }
593            LogicalPlan::TableScan(TableScan {
594                table_name,
595                source,
596                projection,
597                projected_schema,
598                filters,
599                fetch,
600            }) => filters.map_elements(f)?.update_data(|filters| {
601                LogicalPlan::TableScan(TableScan {
602                    table_name,
603                    source,
604                    projection,
605                    projected_schema,
606                    filters,
607                    fetch,
608                })
609            }),
610            LogicalPlan::Distinct(Distinct::On(DistinctOn {
611                on_expr,
612                select_expr,
613                sort_expr,
614                input,
615                schema,
616            })) => (on_expr, select_expr, sort_expr)
617                .map_elements(f)?
618                .update_data(|(on_expr, select_expr, sort_expr)| {
619                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
620                        on_expr,
621                        select_expr,
622                        sort_expr,
623                        input,
624                        schema,
625                    }))
626                }),
627            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
628                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
629                    LogicalPlan::Limit(Limit { skip, fetch, input })
630                })
631            }
632            LogicalPlan::Statement(stmt) => match stmt {
633                Statement::Execute(e) => {
634                    e.parameters.map_elements(f)?.update_data(|parameters| {
635                        Statement::Execute(Execute { parameters, ..e })
636                    })
637                }
638                _ => Transformed::no(stmt),
639            }
640            .update_data(LogicalPlan::Statement),
641            // plans without expressions
642            LogicalPlan::EmptyRelation(_)
643            | LogicalPlan::Unnest(_)
644            | LogicalPlan::RecursiveQuery(_)
645            | LogicalPlan::Subquery(_)
646            | LogicalPlan::SubqueryAlias(_)
647            | LogicalPlan::Analyze(_)
648            | LogicalPlan::Explain(_)
649            | LogicalPlan::Union(_)
650            | LogicalPlan::Distinct(Distinct::All(_))
651            | LogicalPlan::Dml(_)
652            | LogicalPlan::Ddl(_)
653            | LogicalPlan::Copy(_)
654            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
655        })
656    }
657
658    /// Visits a plan similarly to [`Self::visit`], including subqueries that
659    /// may appear in expressions such as `IN (SELECT ...)`.
660    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
661    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
662        &self,
663        visitor: &mut V,
664    ) -> Result<TreeNodeRecursion> {
665        visitor
666            .f_down(self)?
667            .visit_children(|| {
668                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
669                    .visit_sibling(|| {
670                        self.apply_children(|c| c.visit_with_subqueries(visitor))
671                    })
672            })?
673            .visit_parent(|| visitor.f_up(self))
674    }
675
676    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
677    /// including subqueries that may appear in expressions such as `IN (SELECT
678    /// ...)`.
679    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
680    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
681        self,
682        rewriter: &mut R,
683    ) -> Result<Transformed<Self>> {
684        handle_transform_recursion!(
685            rewriter.f_down(self),
686            |c| c.rewrite_with_subqueries(rewriter),
687            |n| rewriter.f_up(n)
688        )
689    }
690
691    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
692    /// including subqueries that may appear in expressions such as `IN (SELECT
693    /// ...)`.
694    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
695        &self,
696        mut f: F,
697    ) -> Result<TreeNodeRecursion> {
698        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
699        fn apply_with_subqueries_impl<
700            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
701        >(
702            node: &LogicalPlan,
703            f: &mut F,
704        ) -> Result<TreeNodeRecursion> {
705            f(node)?.visit_children(|| {
706                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
707                    .visit_sibling(|| {
708                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
709                    })
710            })
711        }
712
713        apply_with_subqueries_impl(self, &mut f)
714    }
715
716    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
717    /// including subqueries that may appear in expressions such as `IN (SELECT
718    /// ...)`.
719    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
720        self,
721        f: F,
722    ) -> Result<Transformed<Self>> {
723        self.transform_up_with_subqueries(f)
724    }
725
726    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
727    /// including subqueries that may appear in expressions such as `IN (SELECT
728    /// ...)`.
729    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
730        self,
731        mut f: F,
732    ) -> Result<Transformed<Self>> {
733        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
734        fn transform_down_with_subqueries_impl<
735            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
736        >(
737            node: LogicalPlan,
738            f: &mut F,
739        ) -> Result<Transformed<LogicalPlan>> {
740            f(node)?.transform_children(|n| {
741                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
742                    .transform_sibling(|n| {
743                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
744                    })
745            })
746        }
747
748        transform_down_with_subqueries_impl(self, &mut f)
749    }
750
751    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
752    /// including subqueries that may appear in expressions such as `IN (SELECT
753    /// ...)`.
754    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
755        self,
756        mut f: F,
757    ) -> Result<Transformed<Self>> {
758        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
759        fn transform_up_with_subqueries_impl<
760            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
761        >(
762            node: LogicalPlan,
763            f: &mut F,
764        ) -> Result<Transformed<LogicalPlan>> {
765            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
766                .transform_sibling(|n| {
767                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
768                })?
769                .transform_parent(f)
770        }
771
772        transform_up_with_subqueries_impl(self, &mut f)
773    }
774
775    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
776    /// including subqueries that may appear in expressions such as `IN (SELECT
777    /// ...)`.
778    pub fn transform_down_up_with_subqueries<
779        FD: FnMut(Self) -> Result<Transformed<Self>>,
780        FU: FnMut(Self) -> Result<Transformed<Self>>,
781    >(
782        self,
783        mut f_down: FD,
784        mut f_up: FU,
785    ) -> Result<Transformed<Self>> {
786        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
787        fn transform_down_up_with_subqueries_impl<
788            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
789            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
790        >(
791            node: LogicalPlan,
792            f_down: &mut FD,
793            f_up: &mut FU,
794        ) -> Result<Transformed<LogicalPlan>> {
795            handle_transform_recursion!(
796                f_down(node),
797                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
798                f_up
799            )
800        }
801
802        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
803    }
804
805    /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
806    /// including subqueries that may appear in expressions such as `IN (SELECT
807    /// ...)`.
808    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
809        &self,
810        mut f: F,
811    ) -> Result<TreeNodeRecursion> {
812        self.apply_expressions(|expr| {
813            expr.apply(|expr| match expr {
814                Expr::Exists(Exists { subquery, .. })
815                | Expr::InSubquery(InSubquery { subquery, .. })
816                | Expr::ScalarSubquery(subquery) => {
817                    // use a synthetic plan so the collector sees a
818                    // LogicalPlan::Subquery (even though it is
819                    // actually a Subquery alias)
820                    f(&LogicalPlan::Subquery(subquery.clone()))
821                }
822                _ => Ok(TreeNodeRecursion::Continue),
823            })
824        })
825    }
826
827    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
828    /// appear in expressions such as `IN (SELECT ...)` using `f`.
829    ///
830    /// Returns the current node.
831    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
832        self,
833        mut f: F,
834    ) -> Result<Transformed<Self>> {
835        self.map_expressions(|expr| {
836            expr.transform_down(|expr| match expr {
837                Expr::Exists(Exists { subquery, negated }) => {
838                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
839                        LogicalPlan::Subquery(subquery) => {
840                            Ok(Expr::Exists(Exists { subquery, negated }))
841                        }
842                        _ => internal_err!("Transformation should return Subquery"),
843                    })
844                }
845                Expr::InSubquery(InSubquery {
846                    expr,
847                    subquery,
848                    negated,
849                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
850                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
851                        expr,
852                        subquery,
853                        negated,
854                    })),
855                    _ => internal_err!("Transformation should return Subquery"),
856                }),
857                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
858                    .map_data(|s| match s {
859                        LogicalPlan::Subquery(subquery) => {
860                            Ok(Expr::ScalarSubquery(subquery))
861                        }
862                        _ => internal_err!("Transformation should return Subquery"),
863                    }),
864                _ => Ok(Transformed::no(expr)),
865            })
866        })
867    }
868}