Skip to main content

datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct,
42    DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit,
43    LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
44    Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
45    Values, Window, dml::CopyTo,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery, SetComparison};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{Result, internal_err};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter { predicate, input }) => input
89                .map_elements(f)?
90                .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })),
91            LogicalPlan::Repartition(Repartition {
92                input,
93                partitioning_scheme,
94            }) => input.map_elements(f)?.update_data(|input| {
95                LogicalPlan::Repartition(Repartition {
96                    input,
97                    partitioning_scheme,
98                })
99            }),
100            LogicalPlan::Window(Window {
101                input,
102                window_expr,
103                schema,
104            }) => input.map_elements(f)?.update_data(|input| {
105                LogicalPlan::Window(Window {
106                    input,
107                    window_expr,
108                    schema,
109                })
110            }),
111            LogicalPlan::Aggregate(Aggregate {
112                input,
113                group_expr,
114                aggr_expr,
115                schema,
116            }) => input.map_elements(f)?.update_data(|input| {
117                LogicalPlan::Aggregate(Aggregate {
118                    input,
119                    group_expr,
120                    aggr_expr,
121                    schema,
122                })
123            }),
124            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
125                .map_elements(f)?
126                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
127            LogicalPlan::Join(Join {
128                left,
129                right,
130                on,
131                filter,
132                join_type,
133                join_constraint,
134                schema,
135                null_equality,
136                null_aware,
137            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
138                LogicalPlan::Join(Join {
139                    left,
140                    right,
141                    on,
142                    filter,
143                    join_type,
144                    join_constraint,
145                    schema,
146                    null_equality,
147                    null_aware,
148                })
149            }),
150            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
151                .map_elements(f)?
152                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
153            LogicalPlan::Subquery(Subquery {
154                subquery,
155                outer_ref_columns,
156                spans,
157            }) => subquery.map_elements(f)?.update_data(|subquery| {
158                LogicalPlan::Subquery(Subquery {
159                    subquery,
160                    outer_ref_columns,
161                    spans,
162                })
163            }),
164            LogicalPlan::SubqueryAlias(SubqueryAlias {
165                input,
166                alias,
167                schema,
168            }) => input.map_elements(f)?.update_data(|input| {
169                LogicalPlan::SubqueryAlias(SubqueryAlias {
170                    input,
171                    alias,
172                    schema,
173                })
174            }),
175            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
176                .update_data(LogicalPlan::Extension),
177            LogicalPlan::Union(Union { inputs, schema }) => inputs
178                .map_elements(f)?
179                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
180            LogicalPlan::Distinct(distinct) => match distinct {
181                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
182                Distinct::On(DistinctOn {
183                    on_expr,
184                    select_expr,
185                    sort_expr,
186                    input,
187                    schema,
188                }) => input.map_elements(f)?.update_data(|input| {
189                    Distinct::On(DistinctOn {
190                        on_expr,
191                        select_expr,
192                        sort_expr,
193                        input,
194                        schema,
195                    })
196                }),
197            }
198            .update_data(LogicalPlan::Distinct),
199            LogicalPlan::Explain(Explain {
200                verbose,
201                explain_format: format,
202                plan,
203                stringified_plans,
204                schema,
205                logical_optimization_succeeded,
206            }) => plan.map_elements(f)?.update_data(|plan| {
207                LogicalPlan::Explain(Explain {
208                    verbose,
209                    explain_format: format,
210                    plan,
211                    stringified_plans,
212                    schema,
213                    logical_optimization_succeeded,
214                })
215            }),
216            LogicalPlan::Analyze(Analyze {
217                verbose,
218                input,
219                schema,
220            }) => input.map_elements(f)?.update_data(|input| {
221                LogicalPlan::Analyze(Analyze {
222                    verbose,
223                    input,
224                    schema,
225                })
226            }),
227            LogicalPlan::Dml(DmlStatement {
228                table_name,
229                target,
230                op,
231                input,
232                output_schema,
233            }) => input.map_elements(f)?.update_data(|input| {
234                LogicalPlan::Dml(DmlStatement {
235                    table_name,
236                    target,
237                    op,
238                    input,
239                    output_schema,
240                })
241            }),
242            LogicalPlan::Copy(CopyTo {
243                input,
244                output_url,
245                partition_by,
246                file_type,
247                options,
248                output_schema,
249            }) => input.map_elements(f)?.update_data(|input| {
250                LogicalPlan::Copy(CopyTo {
251                    input,
252                    output_url,
253                    partition_by,
254                    file_type,
255                    options,
256                    output_schema,
257                })
258            }),
259            LogicalPlan::Ddl(ddl) => {
260                match ddl {
261                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
262                        name,
263                        constraints,
264                        input,
265                        if_not_exists,
266                        or_replace,
267                        column_defaults,
268                        temporary,
269                    }) => input.map_elements(f)?.update_data(|input| {
270                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
271                            name,
272                            constraints,
273                            input,
274                            if_not_exists,
275                            or_replace,
276                            column_defaults,
277                            temporary,
278                        })
279                    }),
280                    DdlStatement::CreateView(CreateView {
281                        name,
282                        input,
283                        or_replace,
284                        definition,
285                        temporary,
286                    }) => input.map_elements(f)?.update_data(|input| {
287                        DdlStatement::CreateView(CreateView {
288                            name,
289                            input,
290                            or_replace,
291                            definition,
292                            temporary,
293                        })
294                    }),
295                    // no inputs in these statements
296                    DdlStatement::CreateExternalTable(_)
297                    | DdlStatement::CreateCatalogSchema(_)
298                    | DdlStatement::CreateCatalog(_)
299                    | DdlStatement::CreateIndex(_)
300                    | DdlStatement::DropTable(_)
301                    | DdlStatement::DropView(_)
302                    | DdlStatement::DropCatalogSchema(_)
303                    | DdlStatement::CreateFunction(_)
304                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
305                }
306                .update_data(LogicalPlan::Ddl)
307            }
308            LogicalPlan::Unnest(Unnest {
309                input,
310                exec_columns: input_columns,
311                list_type_columns,
312                struct_type_columns,
313                dependency_indices,
314                schema,
315                options,
316            }) => input.map_elements(f)?.update_data(|input| {
317                LogicalPlan::Unnest(Unnest {
318                    input,
319                    exec_columns: input_columns,
320                    list_type_columns,
321                    struct_type_columns,
322                    dependency_indices,
323                    schema,
324                    options,
325                })
326            }),
327            LogicalPlan::RecursiveQuery(RecursiveQuery {
328                name,
329                static_term,
330                recursive_term,
331                is_distinct,
332            }) => (static_term, recursive_term).map_elements(f)?.update_data(
333                |(static_term, recursive_term)| {
334                    LogicalPlan::RecursiveQuery(RecursiveQuery {
335                        name,
336                        static_term,
337                        recursive_term,
338                        is_distinct,
339                    })
340                },
341            ),
342            LogicalPlan::Statement(stmt) => match stmt {
343                Statement::Prepare(p) => p
344                    .input
345                    .map_elements(f)?
346                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
347                _ => Transformed::no(stmt),
348            }
349            .update_data(LogicalPlan::Statement),
350            // plans without inputs
351            LogicalPlan::TableScan { .. }
352            | LogicalPlan::EmptyRelation { .. }
353            | LogicalPlan::Values { .. }
354            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
355        })
356    }
357}
358
359/// Rewrites all inputs for an Extension node "in place"
360/// (it currently has to copy values because there are no APIs for in place modification)
361///
362/// Should be removed when we have an API for in place modifications of the
363/// extension to avoid these copies
364fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
365    extension: Extension,
366    f: F,
367) -> Result<Transformed<Extension>> {
368    let Extension { node } = extension;
369
370    node.inputs()
371        .into_iter()
372        .cloned()
373        .map_until_stop_and_collect(f)?
374        .map_data(|new_inputs| {
375            let exprs = node.expressions();
376            Ok(Extension {
377                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
378            })
379        })
380}
381
382/// This macro is used to determine continuation during combined transforming
383/// traversals.
384macro_rules! handle_transform_recursion {
385    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
386        $F_DOWN?
387            .transform_children(|n| {
388                n.map_subqueries($F_CHILD)?
389                    .transform_sibling(|n| n.map_children($F_CHILD))
390            })?
391            .transform_parent($F_UP)
392    }};
393}
394
395impl LogicalPlan {
396    /// Calls `f` on all expressions in the current `LogicalPlan` node.
397    ///
398    /// # Notes
399    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
400    /// * Does not include expressions in input `LogicalPlan` nodes
401    /// * Visits only the top level expressions (Does not recurse into each expression)
402    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
403        &self,
404        mut f: F,
405    ) -> Result<TreeNodeRecursion> {
406        match self {
407            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
408            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
409            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
410            LogicalPlan::Repartition(Repartition {
411                partitioning_scheme,
412                ..
413            }) => match partitioning_scheme {
414                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
415                    expr.apply_elements(f)
416                }
417                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
418            },
419            LogicalPlan::Window(Window { window_expr, .. }) => {
420                window_expr.apply_elements(f)
421            }
422            LogicalPlan::Aggregate(Aggregate {
423                group_expr,
424                aggr_expr,
425                ..
426            }) => (group_expr, aggr_expr).apply_ref_elements(f),
427            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
428            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
429            // 2. the second part is non-equijoin(filter).
430            LogicalPlan::Join(Join { on, filter, .. }) => {
431                (on, filter).apply_ref_elements(f)
432            }
433            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
434            LogicalPlan::Extension(extension) => {
435                // would be nice to avoid this copy -- maybe can
436                // update extension to just observer Exprs
437                extension.node.expressions().apply_elements(f)
438            }
439            LogicalPlan::TableScan(TableScan { filters, .. }) => {
440                filters.apply_elements(f)
441            }
442            LogicalPlan::Unnest(unnest) => {
443                let exprs = unnest
444                    .exec_columns
445                    .iter()
446                    .cloned()
447                    .map(Expr::Column)
448                    .collect::<Vec<_>>();
449                exprs.apply_elements(f)
450            }
451            LogicalPlan::Distinct(Distinct::On(DistinctOn {
452                on_expr,
453                select_expr,
454                sort_expr,
455                ..
456            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
457            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
458                (skip, fetch).apply_ref_elements(f)
459            }
460            LogicalPlan::Statement(stmt) => match stmt {
461                Statement::Execute(Execute { parameters, .. }) => {
462                    parameters.apply_elements(f)
463                }
464                _ => Ok(TreeNodeRecursion::Continue),
465            },
466            // plans without expressions
467            LogicalPlan::EmptyRelation(_)
468            | LogicalPlan::RecursiveQuery(_)
469            | LogicalPlan::Subquery(_)
470            | LogicalPlan::SubqueryAlias(_)
471            | LogicalPlan::Analyze(_)
472            | LogicalPlan::Explain(_)
473            | LogicalPlan::Union(_)
474            | LogicalPlan::Distinct(Distinct::All(_))
475            | LogicalPlan::Dml(_)
476            | LogicalPlan::Ddl(_)
477            | LogicalPlan::Copy(_)
478            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
479        }
480    }
481
482    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
483    ///
484    /// Returns the current node.
485    ///
486    /// # Notes
487    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
488    /// * Visits only the top level expressions (Does not recurse into each expression)
489    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
490        self,
491        mut f: F,
492    ) -> Result<Transformed<Self>> {
493        Ok(match self {
494            LogicalPlan::Projection(Projection {
495                expr,
496                input,
497                schema,
498            }) => expr.map_elements(f)?.update_data(|expr| {
499                LogicalPlan::Projection(Projection {
500                    expr,
501                    input,
502                    schema,
503                })
504            }),
505            LogicalPlan::Values(Values { schema, values }) => values
506                .map_elements(f)?
507                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
508            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
509                .update_data(|predicate| {
510                    LogicalPlan::Filter(Filter { predicate, input })
511                }),
512            LogicalPlan::Repartition(Repartition {
513                input,
514                partitioning_scheme,
515            }) => match partitioning_scheme {
516                Partitioning::Hash(expr, usize) => expr
517                    .map_elements(f)?
518                    .update_data(|expr| Partitioning::Hash(expr, usize)),
519                Partitioning::DistributeBy(expr) => expr
520                    .map_elements(f)?
521                    .update_data(Partitioning::DistributeBy),
522                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
523            }
524            .update_data(|partitioning_scheme| {
525                LogicalPlan::Repartition(Repartition {
526                    input,
527                    partitioning_scheme,
528                })
529            }),
530            LogicalPlan::Window(Window {
531                input,
532                window_expr,
533                schema,
534            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
535                LogicalPlan::Window(Window {
536                    input,
537                    window_expr,
538                    schema,
539                })
540            }),
541            LogicalPlan::Aggregate(Aggregate {
542                input,
543                group_expr,
544                aggr_expr,
545                schema,
546            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
547                |(group_expr, aggr_expr)| {
548                    LogicalPlan::Aggregate(Aggregate {
549                        input,
550                        group_expr,
551                        aggr_expr,
552                        schema,
553                    })
554                },
555            ),
556
557            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
558            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
559            // 2. the second part is non-equijoin(filter).
560            LogicalPlan::Join(Join {
561                left,
562                right,
563                on,
564                filter,
565                join_type,
566                join_constraint,
567                schema,
568                null_equality,
569                null_aware,
570            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
571                LogicalPlan::Join(Join {
572                    left,
573                    right,
574                    on,
575                    filter,
576                    join_type,
577                    join_constraint,
578                    schema,
579                    null_equality,
580                    null_aware,
581                })
582            }),
583            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
584                .map_elements(f)?
585                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
586            LogicalPlan::Extension(Extension { node }) => {
587                let raw_exprs = node.expressions();
588                if raw_exprs.is_empty() {
589                    // No expressions to transform — skip expensive clone of
590                    // all inputs and reconstruction via with_exprs_and_inputs.
591                    Transformed::no(LogicalPlan::Extension(Extension { node }))
592                } else {
593                    // TODO: a more general optimization would be to change
594                    // `UserDefinedLogicalNode::expressions()` to return
595                    // references (`&[Expr]`) instead of cloned `Vec<Expr>`,
596                    // and only clone + rebuild when the transform actually
597                    // modifies an expression. This would avoid the clone +
598                    // `with_exprs_and_inputs` rebuild even for non-empty
599                    // expression lists when the transform is a no-op.
600                    let exprs = raw_exprs.map_elements(f)?;
601                    let plan = LogicalPlan::Extension(Extension {
602                        node: UserDefinedLogicalNode::with_exprs_and_inputs(
603                            node.as_ref(),
604                            exprs.data,
605                            node.inputs().into_iter().cloned().collect::<Vec<_>>(),
606                        )?,
607                    });
608                    Transformed::new(plan, exprs.transformed, exprs.tnr)
609                }
610            }
611            LogicalPlan::TableScan(TableScan {
612                table_name,
613                source,
614                projection,
615                projected_schema,
616                filters,
617                fetch,
618            }) => filters.map_elements(f)?.update_data(|filters| {
619                LogicalPlan::TableScan(TableScan {
620                    table_name,
621                    source,
622                    projection,
623                    projected_schema,
624                    filters,
625                    fetch,
626                })
627            }),
628            LogicalPlan::Distinct(Distinct::On(DistinctOn {
629                on_expr,
630                select_expr,
631                sort_expr,
632                input,
633                schema,
634            })) => (on_expr, select_expr, sort_expr)
635                .map_elements(f)?
636                .update_data(|(on_expr, select_expr, sort_expr)| {
637                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
638                        on_expr,
639                        select_expr,
640                        sort_expr,
641                        input,
642                        schema,
643                    }))
644                }),
645            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
646                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
647                    LogicalPlan::Limit(Limit { skip, fetch, input })
648                })
649            }
650            LogicalPlan::Statement(stmt) => match stmt {
651                Statement::Execute(e) => {
652                    e.parameters.map_elements(f)?.update_data(|parameters| {
653                        Statement::Execute(Execute { parameters, ..e })
654                    })
655                }
656                _ => Transformed::no(stmt),
657            }
658            .update_data(LogicalPlan::Statement),
659            // plans without expressions
660            LogicalPlan::EmptyRelation(_)
661            | LogicalPlan::Unnest(_)
662            | LogicalPlan::RecursiveQuery(_)
663            | LogicalPlan::Subquery(_)
664            | LogicalPlan::SubqueryAlias(_)
665            | LogicalPlan::Analyze(_)
666            | LogicalPlan::Explain(_)
667            | LogicalPlan::Union(_)
668            | LogicalPlan::Distinct(Distinct::All(_))
669            | LogicalPlan::Dml(_)
670            | LogicalPlan::Ddl(_)
671            | LogicalPlan::Copy(_)
672            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
673        })
674    }
675
676    /// Visits a plan similarly to [`Self::visit`], including subqueries that
677    /// may appear in expressions such as `IN (SELECT ...)`.
678    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
679    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
680        &self,
681        visitor: &mut V,
682    ) -> Result<TreeNodeRecursion> {
683        visitor
684            .f_down(self)?
685            .visit_children(|| {
686                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
687                    .visit_sibling(|| {
688                        self.apply_children(|c| c.visit_with_subqueries(visitor))
689                    })
690            })?
691            .visit_parent(|| visitor.f_up(self))
692    }
693
694    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
695    /// including subqueries that may appear in expressions such as `IN (SELECT
696    /// ...)`.
697    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
698    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
699        self,
700        rewriter: &mut R,
701    ) -> Result<Transformed<Self>> {
702        handle_transform_recursion!(
703            rewriter.f_down(self),
704            |c| c.rewrite_with_subqueries(rewriter),
705            |n| rewriter.f_up(n)
706        )
707    }
708
709    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
710    /// including subqueries that may appear in expressions such as `IN (SELECT
711    /// ...)`.
712    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
713        &self,
714        mut f: F,
715    ) -> Result<TreeNodeRecursion> {
716        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
717        fn apply_with_subqueries_impl<
718            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
719        >(
720            node: &LogicalPlan,
721            f: &mut F,
722        ) -> Result<TreeNodeRecursion> {
723            f(node)?.visit_children(|| {
724                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
725                    .visit_sibling(|| {
726                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
727                    })
728            })
729        }
730
731        apply_with_subqueries_impl(self, &mut f)
732    }
733
734    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
735    /// including subqueries that may appear in expressions such as `IN (SELECT
736    /// ...)`.
737    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
738        self,
739        f: F,
740    ) -> Result<Transformed<Self>> {
741        self.transform_up_with_subqueries(f)
742    }
743
744    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
745    /// including subqueries that may appear in expressions such as `IN (SELECT
746    /// ...)`.
747    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
748        self,
749        mut f: F,
750    ) -> Result<Transformed<Self>> {
751        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
752        fn transform_down_with_subqueries_impl<
753            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
754        >(
755            node: LogicalPlan,
756            f: &mut F,
757        ) -> Result<Transformed<LogicalPlan>> {
758            f(node)?.transform_children(|n| {
759                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
760                    .transform_sibling(|n| {
761                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
762                    })
763            })
764        }
765
766        transform_down_with_subqueries_impl(self, &mut f)
767    }
768
769    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
770    /// including subqueries that may appear in expressions such as `IN (SELECT
771    /// ...)`.
772    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
773        self,
774        mut f: F,
775    ) -> Result<Transformed<Self>> {
776        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
777        fn transform_up_with_subqueries_impl<
778            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
779        >(
780            node: LogicalPlan,
781            f: &mut F,
782        ) -> Result<Transformed<LogicalPlan>> {
783            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
784                .transform_sibling(|n| {
785                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
786                })?
787                .transform_parent(f)
788        }
789
790        transform_up_with_subqueries_impl(self, &mut f)
791    }
792
793    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
794    /// including subqueries that may appear in expressions such as `IN (SELECT
795    /// ...)`.
796    pub fn transform_down_up_with_subqueries<
797        FD: FnMut(Self) -> Result<Transformed<Self>>,
798        FU: FnMut(Self) -> Result<Transformed<Self>>,
799    >(
800        self,
801        mut f_down: FD,
802        mut f_up: FU,
803    ) -> Result<Transformed<Self>> {
804        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
805        fn transform_down_up_with_subqueries_impl<
806            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
807            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
808        >(
809            node: LogicalPlan,
810            f_down: &mut FD,
811            f_up: &mut FU,
812        ) -> Result<Transformed<LogicalPlan>> {
813            handle_transform_recursion!(
814                f_down(node),
815                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
816                f_up
817            )
818        }
819
820        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
821    }
822
823    /// Similarly to [`Self::apply`], calls `f` on this node and its inputs,
824    /// including subqueries that may appear in expressions such as `IN (SELECT
825    /// ...)`.
826    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
827        &self,
828        mut f: F,
829    ) -> Result<TreeNodeRecursion> {
830        self.apply_expressions(|expr| {
831            expr.apply(|expr| match expr {
832                Expr::Exists(Exists { subquery, .. })
833                | Expr::InSubquery(InSubquery { subquery, .. })
834                | Expr::SetComparison(SetComparison { subquery, .. })
835                | Expr::ScalarSubquery(subquery) => {
836                    // Wrap in LogicalPlan::Subquery to match f's signature
837                    f(&LogicalPlan::Subquery(subquery.clone()))
838                }
839                _ => Ok(TreeNodeRecursion::Continue),
840            })
841        })
842    }
843
844    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
845    /// appear in expressions such as `IN (SELECT ...)` using `f`.
846    ///
847    /// Returns the current node.
848    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
849        self,
850        mut f: F,
851    ) -> Result<Transformed<Self>> {
852        self.map_expressions(|expr| {
853            expr.transform_down(|expr| match expr {
854                Expr::Exists(Exists { subquery, negated }) => {
855                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
856                        LogicalPlan::Subquery(subquery) => {
857                            Ok(Expr::Exists(Exists { subquery, negated }))
858                        }
859                        _ => internal_err!("Transformation should return Subquery"),
860                    })
861                }
862                Expr::InSubquery(InSubquery {
863                    expr,
864                    subquery,
865                    negated,
866                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
867                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
868                        expr,
869                        subquery,
870                        negated,
871                    })),
872                    _ => internal_err!("Transformation should return Subquery"),
873                }),
874                Expr::SetComparison(SetComparison {
875                    expr,
876                    subquery,
877                    op,
878                    quantifier,
879                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
880                    LogicalPlan::Subquery(subquery) => {
881                        Ok(Expr::SetComparison(SetComparison {
882                            expr,
883                            subquery,
884                            op,
885                            quantifier,
886                        }))
887                    }
888                    _ => internal_err!("Transformation should return Subquery"),
889                }),
890                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
891                    .map_data(|s| match s {
892                        LogicalPlan::Subquery(subquery) => {
893                            Ok(Expr::ScalarSubquery(subquery))
894                        }
895                        _ => internal_err!("Transformation should return Subquery"),
896                    }),
897                _ => Ok(Transformed::no(expr)),
898            })
899        })
900    }
901
902    /// Similar to [`Self::map_subqueries`], but only applies `f` to
903    /// uncorrelated subqueries (those with no outer column references).
904    pub fn map_uncorrelated_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
905        self,
906        mut f: F,
907    ) -> Result<Transformed<Self>> {
908        self.map_subqueries(|subquery_plan| match &subquery_plan {
909            LogicalPlan::Subquery(sq) if sq.outer_ref_columns.is_empty() => {
910                f(subquery_plan)
911            }
912            _ => Ok(Transformed::no(subquery_plan)),
913        })
914    }
915}