Skip to main content

datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct,
42    DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit,
43    LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
44    Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
45    Values, Window, dml::CopyTo,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery, SetComparison};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{Result, internal_err};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter { predicate, input }) => input
89                .map_elements(f)?
90                .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91            LogicalPlan::Repartition(Repartition {
92                input,
93                partitioning_scheme,
94            }) => input.map_elements(f)?.update_data(|input| {
95                LogicalPlan::Repartition(Repartition {
96                    input,
97                    partitioning_scheme,
98                })
99            }),
100            LogicalPlan::Window(Window {
101                input,
102                window_expr,
103                schema,
104            }) => input.map_elements(f)?.update_data(|input| {
105                LogicalPlan::Window(Window {
106                    input,
107                    window_expr,
108                    schema,
109                })
110            }),
111            LogicalPlan::Aggregate(Aggregate {
112                input,
113                group_expr,
114                aggr_expr,
115                schema,
116            }) => input.map_elements(f)?.update_data(|input| {
117                LogicalPlan::Aggregate(Aggregate {
118                    input,
119                    group_expr,
120                    aggr_expr,
121                    schema,
122                })
123            }),
124            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125                .map_elements(f)?
126                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127            LogicalPlan::Join(Join {
128                left,
129                right,
130                on,
131                filter,
132                join_type,
133                join_constraint,
134                schema,
135                null_equality,
136                null_aware,
137            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
138                LogicalPlan::Join(Join {
139                    left,
140                    right,
141                    on,
142                    filter,
143                    join_type,
144                    join_constraint,
145                    schema,
146                    null_equality,
147                    null_aware,
148                })
149            }),
150            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
151                .map_elements(f)?
152                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
153            LogicalPlan::Subquery(Subquery {
154                subquery,
155                outer_ref_columns,
156                spans,
157            }) => subquery.map_elements(f)?.update_data(|subquery| {
158                LogicalPlan::Subquery(Subquery {
159                    subquery,
160                    outer_ref_columns,
161                    spans,
162                })
163            }),
164            LogicalPlan::SubqueryAlias(SubqueryAlias {
165                input,
166                alias,
167                schema,
168            }) => input.map_elements(f)?.update_data(|input| {
169                LogicalPlan::SubqueryAlias(SubqueryAlias {
170                    input,
171                    alias,
172                    schema,
173                })
174            }),
175            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
176                .update_data(LogicalPlan::Extension),
177            LogicalPlan::Union(Union { inputs, schema }) => inputs
178                .map_elements(f)?
179                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
180            LogicalPlan::Distinct(distinct) => match distinct {
181                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
182                Distinct::On(DistinctOn {
183                    on_expr,
184                    select_expr,
185                    sort_expr,
186                    input,
187                    schema,
188                }) => input.map_elements(f)?.update_data(|input| {
189                    Distinct::On(DistinctOn {
190                        on_expr,
191                        select_expr,
192                        sort_expr,
193                        input,
194                        schema,
195                    })
196                }),
197            }
198            .update_data(LogicalPlan::Distinct),
199            LogicalPlan::Explain(Explain {
200                verbose,
201                explain_format: format,
202                plan,
203                stringified_plans,
204                schema,
205                logical_optimization_succeeded,
206            }) => plan.map_elements(f)?.update_data(|plan| {
207                LogicalPlan::Explain(Explain {
208                    verbose,
209                    explain_format: format,
210                    plan,
211                    stringified_plans,
212                    schema,
213                    logical_optimization_succeeded,
214                })
215            }),
216            LogicalPlan::Analyze(Analyze {
217                verbose,
218                input,
219                schema,
220            }) => input.map_elements(f)?.update_data(|input| {
221                LogicalPlan::Analyze(Analyze {
222                    verbose,
223                    input,
224                    schema,
225                })
226            }),
227            LogicalPlan::Dml(DmlStatement {
228                table_name,
229                target,
230                op,
231                input,
232                output_schema,
233            }) => input.map_elements(f)?.update_data(|input| {
234                LogicalPlan::Dml(DmlStatement {
235                    table_name,
236                    target,
237                    op,
238                    input,
239                    output_schema,
240                })
241            }),
242            LogicalPlan::Copy(CopyTo {
243                input,
244                output_url,
245                partition_by,
246                file_type,
247                options,
248                output_schema,
249            }) => input.map_elements(f)?.update_data(|input| {
250                LogicalPlan::Copy(CopyTo {
251                    input,
252                    output_url,
253                    partition_by,
254                    file_type,
255                    options,
256                    output_schema,
257                })
258            }),
259            LogicalPlan::Ddl(ddl) => {
260                match ddl {
261                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
262                        name,
263                        constraints,
264                        input,
265                        if_not_exists,
266                        or_replace,
267                        column_defaults,
268                        temporary,
269                    }) => input.map_elements(f)?.update_data(|input| {
270                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
271                            name,
272                            constraints,
273                            input,
274                            if_not_exists,
275                            or_replace,
276                            column_defaults,
277                            temporary,
278                        })
279                    }),
280                    DdlStatement::CreateView(CreateView {
281                        name,
282                        input,
283                        or_replace,
284                        definition,
285                        temporary,
286                    }) => input.map_elements(f)?.update_data(|input| {
287                        DdlStatement::CreateView(CreateView {
288                            name,
289                            input,
290                            or_replace,
291                            definition,
292                            temporary,
293                        })
294                    }),
295                    // no inputs in these statements
296                    DdlStatement::CreateExternalTable(_)
297                    | DdlStatement::CreateCatalogSchema(_)
298                    | DdlStatement::CreateCatalog(_)
299                    | DdlStatement::CreateIndex(_)
300                    | DdlStatement::DropTable(_)
301                    | DdlStatement::DropView(_)
302                    | DdlStatement::DropCatalogSchema(_)
303                    | DdlStatement::CreateFunction(_)
304                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
305                }
306                .update_data(LogicalPlan::Ddl)
307            }
308            LogicalPlan::Unnest(Unnest {
309                input,
310                exec_columns: input_columns,
311                list_type_columns,
312                struct_type_columns,
313                dependency_indices,
314                schema,
315                options,
316            }) => input.map_elements(f)?.update_data(|input| {
317                LogicalPlan::Unnest(Unnest {
318                    input,
319                    exec_columns: input_columns,
320                    list_type_columns,
321                    struct_type_columns,
322                    dependency_indices,
323                    schema,
324                    options,
325                })
326            }),
327            LogicalPlan::RecursiveQuery(RecursiveQuery {
328                name,
329                static_term,
330                recursive_term,
331                is_distinct,
332            }) => (static_term, recursive_term).map_elements(f)?.update_data(
333                |(static_term, recursive_term)| {
334                    LogicalPlan::RecursiveQuery(RecursiveQuery {
335                        name,
336                        static_term,
337                        recursive_term,
338                        is_distinct,
339                    })
340                },
341            ),
342            LogicalPlan::Statement(stmt) => match stmt {
343                Statement::Prepare(p) => p
344                    .input
345                    .map_elements(f)?
346                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
347                _ => Transformed::no(stmt),
348            }
349            .update_data(LogicalPlan::Statement),
350            // plans without inputs
351            LogicalPlan::TableScan { .. }
352            | LogicalPlan::EmptyRelation { .. }
353            | LogicalPlan::Values { .. }
354            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
355        })
356    }
357}
358
359/// Rewrites all inputs for an Extension node "in place"
360/// (it currently has to copy values because there are no APIs for in place modification)
361///
362/// Should be removed when we have an API for in place modifications of the
363/// extension to avoid these copies
364fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
365    extension: Extension,
366    f: F,
367) -> Result<Transformed<Extension>> {
368    let Extension { node } = extension;
369
370    node.inputs()
371        .into_iter()
372        .cloned()
373        .map_until_stop_and_collect(f)?
374        .map_data(|new_inputs| {
375            let exprs = node.expressions();
376            Ok(Extension {
377                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
378            })
379        })
380}
381
382/// This macro is used to determine continuation during combined transforming
383/// traversals.
384macro_rules! handle_transform_recursion {
385    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
386        $F_DOWN?
387            .transform_children(|n| {
388                n.map_subqueries($F_CHILD)?
389                    .transform_sibling(|n| n.map_children($F_CHILD))
390            })?
391            .transform_parent($F_UP)
392    }};
393}
394
395impl LogicalPlan {
396    /// Calls `f` on all expressions in the current `LogicalPlan` node.
397    ///
398    /// # Notes
399    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
400    /// * Does not include expressions in input `LogicalPlan` nodes
401    /// * Visits only the top level expressions (Does not recurse into each expression)
402    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
403        &self,
404        mut f: F,
405    ) -> Result<TreeNodeRecursion> {
406        match self {
407            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
408            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
409            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
410            LogicalPlan::Repartition(Repartition {
411                partitioning_scheme,
412                ..
413            }) => match partitioning_scheme {
414                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
415                    expr.apply_elements(f)
416                }
417                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
418            },
419            LogicalPlan::Window(Window { window_expr, .. }) => {
420                window_expr.apply_elements(f)
421            }
422            LogicalPlan::Aggregate(Aggregate {
423                group_expr,
424                aggr_expr,
425                ..
426            }) => (group_expr, aggr_expr).apply_ref_elements(f),
427            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
428            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
429            // 2. the second part is non-equijoin(filter).
430            LogicalPlan::Join(Join { on, filter, .. }) => {
431                (on, filter).apply_ref_elements(f)
432            }
433            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
434            LogicalPlan::Extension(extension) => {
435                // would be nice to avoid this copy -- maybe can
436                // update extension to just observer Exprs
437                extension.node.expressions().apply_elements(f)
438            }
439            LogicalPlan::TableScan(TableScan { filters, .. }) => {
440                filters.apply_elements(f)
441            }
442            LogicalPlan::Unnest(unnest) => {
443                let exprs = unnest
444                    .exec_columns
445                    .iter()
446                    .cloned()
447                    .map(Expr::Column)
448                    .collect::<Vec<_>>();
449                exprs.apply_elements(f)
450            }
451            LogicalPlan::Distinct(Distinct::On(DistinctOn {
452                on_expr,
453                select_expr,
454                sort_expr,
455                ..
456            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
457            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
458                (skip, fetch).apply_ref_elements(f)
459            }
460            LogicalPlan::Statement(stmt) => match stmt {
461                Statement::Execute(Execute { parameters, .. }) => {
462                    parameters.apply_elements(f)
463                }
464                _ => Ok(TreeNodeRecursion::Continue),
465            },
466            // plans without expressions
467            LogicalPlan::EmptyRelation(_)
468            | LogicalPlan::RecursiveQuery(_)
469            | LogicalPlan::Subquery(_)
470            | LogicalPlan::SubqueryAlias(_)
471            | LogicalPlan::Analyze(_)
472            | LogicalPlan::Explain(_)
473            | LogicalPlan::Union(_)
474            | LogicalPlan::Distinct(Distinct::All(_))
475            | LogicalPlan::Dml(_)
476            | LogicalPlan::Ddl(_)
477            | LogicalPlan::Copy(_)
478            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
479        }
480    }
481
482    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
483    ///
484    /// Returns the current node.
485    ///
486    /// # Notes
487    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
488    /// * Visits only the top level expressions (Does not recurse into each expression)
489    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
490        self,
491        mut f: F,
492    ) -> Result<Transformed<Self>> {
493        Ok(match self {
494            LogicalPlan::Projection(Projection {
495                expr,
496                input,
497                schema,
498            }) => expr.map_elements(f)?.update_data(|expr| {
499                LogicalPlan::Projection(Projection {
500                    expr,
501                    input,
502                    schema,
503                })
504            }),
505            LogicalPlan::Values(Values { schema, values }) => values
506                .map_elements(f)?
507                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
508            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
509                .update_data(|predicate| {
510                    LogicalPlan::Filter(Filter { predicate, input })
511                }),
512            LogicalPlan::Repartition(Repartition {
513                input,
514                partitioning_scheme,
515            }) => match partitioning_scheme {
516                Partitioning::Hash(expr, usize) => expr
517                    .map_elements(f)?
518                    .update_data(|expr| Partitioning::Hash(expr, usize)),
519                Partitioning::DistributeBy(expr) => expr
520                    .map_elements(f)?
521                    .update_data(Partitioning::DistributeBy),
522                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
523            }
524            .update_data(|partitioning_scheme| {
525                LogicalPlan::Repartition(Repartition {
526                    input,
527                    partitioning_scheme,
528                })
529            }),
530            LogicalPlan::Window(Window {
531                input,
532                window_expr,
533                schema,
534            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
535                LogicalPlan::Window(Window {
536                    input,
537                    window_expr,
538                    schema,
539                })
540            }),
541            LogicalPlan::Aggregate(Aggregate {
542                input,
543                group_expr,
544                aggr_expr,
545                schema,
546            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
547                |(group_expr, aggr_expr)| {
548                    LogicalPlan::Aggregate(Aggregate {
549                        input,
550                        group_expr,
551                        aggr_expr,
552                        schema,
553                    })
554                },
555            ),
556
557            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
558            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
559            // 2. the second part is non-equijoin(filter).
560            LogicalPlan::Join(Join {
561                left,
562                right,
563                on,
564                filter,
565                join_type,
566                join_constraint,
567                schema,
568                null_equality,
569                null_aware,
570            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
571                LogicalPlan::Join(Join {
572                    left,
573                    right,
574                    on,
575                    filter,
576                    join_type,
577                    join_constraint,
578                    schema,
579                    null_equality,
580                    null_aware,
581                })
582            }),
583            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
584                .map_elements(f)?
585                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
586            LogicalPlan::Extension(Extension { node }) => {
587                // would be nice to avoid this copy -- maybe can
588                // update extension to just observer Exprs
589                let exprs = node.expressions().map_elements(f)?;
590                let plan = LogicalPlan::Extension(Extension {
591                    node: UserDefinedLogicalNode::with_exprs_and_inputs(
592                        node.as_ref(),
593                        exprs.data,
594                        node.inputs().into_iter().cloned().collect::<Vec<_>>(),
595                    )?,
596                });
597                Transformed::new(plan, exprs.transformed, exprs.tnr)
598            }
599            LogicalPlan::TableScan(TableScan {
600                table_name,
601                source,
602                projection,
603                projected_schema,
604                filters,
605                fetch,
606            }) => filters.map_elements(f)?.update_data(|filters| {
607                LogicalPlan::TableScan(TableScan {
608                    table_name,
609                    source,
610                    projection,
611                    projected_schema,
612                    filters,
613                    fetch,
614                })
615            }),
616            LogicalPlan::Distinct(Distinct::On(DistinctOn {
617                on_expr,
618                select_expr,
619                sort_expr,
620                input,
621                schema,
622            })) => (on_expr, select_expr, sort_expr)
623                .map_elements(f)?
624                .update_data(|(on_expr, select_expr, sort_expr)| {
625                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
626                        on_expr,
627                        select_expr,
628                        sort_expr,
629                        input,
630                        schema,
631                    }))
632                }),
633            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
634                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
635                    LogicalPlan::Limit(Limit { skip, fetch, input })
636                })
637            }
638            LogicalPlan::Statement(stmt) => match stmt {
639                Statement::Execute(e) => {
640                    e.parameters.map_elements(f)?.update_data(|parameters| {
641                        Statement::Execute(Execute { parameters, ..e })
642                    })
643                }
644                _ => Transformed::no(stmt),
645            }
646            .update_data(LogicalPlan::Statement),
647            // plans without expressions
648            LogicalPlan::EmptyRelation(_)
649            | LogicalPlan::Unnest(_)
650            | LogicalPlan::RecursiveQuery(_)
651            | LogicalPlan::Subquery(_)
652            | LogicalPlan::SubqueryAlias(_)
653            | LogicalPlan::Analyze(_)
654            | LogicalPlan::Explain(_)
655            | LogicalPlan::Union(_)
656            | LogicalPlan::Distinct(Distinct::All(_))
657            | LogicalPlan::Dml(_)
658            | LogicalPlan::Ddl(_)
659            | LogicalPlan::Copy(_)
660            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
661        })
662    }
663
664    /// Visits a plan similarly to [`Self::visit`], including subqueries that
665    /// may appear in expressions such as `IN (SELECT ...)`.
666    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
667    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
668        &self,
669        visitor: &mut V,
670    ) -> Result<TreeNodeRecursion> {
671        visitor
672            .f_down(self)?
673            .visit_children(|| {
674                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
675                    .visit_sibling(|| {
676                        self.apply_children(|c| c.visit_with_subqueries(visitor))
677                    })
678            })?
679            .visit_parent(|| visitor.f_up(self))
680    }
681
682    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
683    /// including subqueries that may appear in expressions such as `IN (SELECT
684    /// ...)`.
685    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
686    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
687        self,
688        rewriter: &mut R,
689    ) -> Result<Transformed<Self>> {
690        handle_transform_recursion!(
691            rewriter.f_down(self),
692            |c| c.rewrite_with_subqueries(rewriter),
693            |n| rewriter.f_up(n)
694        )
695    }
696
697    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
698    /// including subqueries that may appear in expressions such as `IN (SELECT
699    /// ...)`.
700    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
701        &self,
702        mut f: F,
703    ) -> Result<TreeNodeRecursion> {
704        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
705        fn apply_with_subqueries_impl<
706            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
707        >(
708            node: &LogicalPlan,
709            f: &mut F,
710        ) -> Result<TreeNodeRecursion> {
711            f(node)?.visit_children(|| {
712                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
713                    .visit_sibling(|| {
714                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
715                    })
716            })
717        }
718
719        apply_with_subqueries_impl(self, &mut f)
720    }
721
722    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
723    /// including subqueries that may appear in expressions such as `IN (SELECT
724    /// ...)`.
725    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
726        self,
727        f: F,
728    ) -> Result<Transformed<Self>> {
729        self.transform_up_with_subqueries(f)
730    }
731
732    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
733    /// including subqueries that may appear in expressions such as `IN (SELECT
734    /// ...)`.
735    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
736        self,
737        mut f: F,
738    ) -> Result<Transformed<Self>> {
739        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
740        fn transform_down_with_subqueries_impl<
741            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
742        >(
743            node: LogicalPlan,
744            f: &mut F,
745        ) -> Result<Transformed<LogicalPlan>> {
746            f(node)?.transform_children(|n| {
747                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
748                    .transform_sibling(|n| {
749                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
750                    })
751            })
752        }
753
754        transform_down_with_subqueries_impl(self, &mut f)
755    }
756
757    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
758    /// including subqueries that may appear in expressions such as `IN (SELECT
759    /// ...)`.
760    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
761        self,
762        mut f: F,
763    ) -> Result<Transformed<Self>> {
764        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
765        fn transform_up_with_subqueries_impl<
766            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
767        >(
768            node: LogicalPlan,
769            f: &mut F,
770        ) -> Result<Transformed<LogicalPlan>> {
771            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
772                .transform_sibling(|n| {
773                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
774                })?
775                .transform_parent(f)
776        }
777
778        transform_up_with_subqueries_impl(self, &mut f)
779    }
780
781    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
782    /// including subqueries that may appear in expressions such as `IN (SELECT
783    /// ...)`.
784    pub fn transform_down_up_with_subqueries<
785        FD: FnMut(Self) -> Result<Transformed<Self>>,
786        FU: FnMut(Self) -> Result<Transformed<Self>>,
787    >(
788        self,
789        mut f_down: FD,
790        mut f_up: FU,
791    ) -> Result<Transformed<Self>> {
792        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
793        fn transform_down_up_with_subqueries_impl<
794            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
795            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
796        >(
797            node: LogicalPlan,
798            f_down: &mut FD,
799            f_up: &mut FU,
800        ) -> Result<Transformed<LogicalPlan>> {
801            handle_transform_recursion!(
802                f_down(node),
803                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
804                f_up
805            )
806        }
807
808        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
809    }
810
811    /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
812    /// including subqueries that may appear in expressions such as `IN (SELECT
813    /// ...)`.
814    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
815        &self,
816        mut f: F,
817    ) -> Result<TreeNodeRecursion> {
818        self.apply_expressions(|expr| {
819            expr.apply(|expr| match expr {
820                Expr::Exists(Exists { subquery, .. })
821                | Expr::InSubquery(InSubquery { subquery, .. })
822                | Expr::SetComparison(SetComparison { subquery, .. })
823                | Expr::ScalarSubquery(subquery) => {
824                    // use a synthetic plan so the collector sees a
825                    // LogicalPlan::Subquery (even though it is
826                    // actually a Subquery alias)
827                    f(&LogicalPlan::Subquery(subquery.clone()))
828                }
829                _ => Ok(TreeNodeRecursion::Continue),
830            })
831        })
832    }
833
834    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
835    /// appear in expressions such as `IN (SELECT ...)` using `f`.
836    ///
837    /// Returns the current node.
838    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
839        self,
840        mut f: F,
841    ) -> Result<Transformed<Self>> {
842        self.map_expressions(|expr| {
843            expr.transform_down(|expr| match expr {
844                Expr::Exists(Exists { subquery, negated }) => {
845                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
846                        LogicalPlan::Subquery(subquery) => {
847                            Ok(Expr::Exists(Exists { subquery, negated }))
848                        }
849                        _ => internal_err!("Transformation should return Subquery"),
850                    })
851                }
852                Expr::InSubquery(InSubquery {
853                    expr,
854                    subquery,
855                    negated,
856                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
857                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
858                        expr,
859                        subquery,
860                        negated,
861                    })),
862                    _ => internal_err!("Transformation should return Subquery"),
863                }),
864                Expr::SetComparison(SetComparison {
865                    expr,
866                    subquery,
867                    op,
868                    quantifier,
869                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
870                    LogicalPlan::Subquery(subquery) => {
871                        Ok(Expr::SetComparison(SetComparison {
872                            expr,
873                            subquery,
874                            op,
875                            quantifier,
876                        }))
877                    }
878                    _ => internal_err!("Transformation should return Subquery"),
879                }),
880                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
881                    .map_data(|s| match s {
882                        LogicalPlan::Subquery(subquery) => {
883                            Ok(Expr::ScalarSubquery(subquery))
884                        }
885                        _ => internal_err!("Transformation should return Subquery"),
886                    }),
887                _ => Ok(Transformed::no(expr)),
888            })
889        })
890    }
891}