Skip to main content

datafusion_optimizer/
extract_leaf_expressions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Two-pass optimizer pipeline that pushes cheap expressions (like struct field
19//! access `user['status']`) closer to data sources, enabling early data reduction
20//! and source-level optimizations (e.g., Parquet column pruning). See
21//! [`ExtractLeafExpressions`] (pass 1) and [`PushDownLeafProjections`] (pass 2).
22
23use indexmap::{IndexMap, IndexSet};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use datafusion_common::alias::AliasGenerator;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
29use datafusion_common::{Column, DFSchema, Result, qualified_name};
30use datafusion_expr::logical_plan::LogicalPlan;
31use datafusion_expr::{Expr, ExpressionPlacement, Projection};
32
33use crate::optimizer::ApplyOrder;
34use crate::push_down_filter::replace_cols_by_name;
35use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
36use crate::{OptimizerConfig, OptimizerRule};
37
38/// Prefix for aliases generated by the extraction optimizer passes.
39///
40/// This prefix is **reserved for internal optimizer use**. User-defined aliases
41/// starting with this prefix may be misidentified as optimizer-generated
42/// extraction aliases, leading to unexpected behavior. Do not use this prefix
43/// in user queries.
44const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
45
46/// Returns `true` if any sub-expression in `exprs` has
47/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement.
48///
49/// This is a lightweight pre-check that short-circuits as soon as one
50/// extractable expression is found, avoiding the expensive allocations
51/// (column HashSets, extractors, expression rewrites) that the full
52/// extraction pipeline requires.
53fn has_extractable_expr(exprs: &[Expr]) -> bool {
54    exprs.iter().any(|expr| {
55        expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
56            .unwrap_or(false)
57    })
58}
59
60/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
61/// into **extraction projections** (pass 1 of 2).
62///
63/// This handles Filter, Sort, Limit, Aggregate, and Join nodes. For Projection
64/// nodes, extraction and pushdown are handled by [`PushDownLeafProjections`].
65///
66/// # Key Concepts
67///
68/// **Extraction projection**: a projection inserted *below* a node that
69/// pre-computes a cheap expression and exposes it under an alias
70/// (`__datafusion_extracted_N`). The parent node then references the alias
71/// instead of the original expression.
72///
73/// **Recovery projection**: a projection inserted *above* a node to restore
74/// the original output schema when extraction changes it.
75/// Schema-preserving nodes (Filter, Sort, Limit) gain extra columns from
76/// the extraction projection that bubble up; the recovery projection selects
77/// only the original columns to hide the extras.
78///
79/// # Example
80///
81/// Given a filter with a struct field access:
82///
83/// ```text
84/// Filter: user['status'] = 'active'
85///   TableScan: t [id, user]
86/// ```
87///
88/// This rule:
89/// 1. Inserts an **extraction projection** below the filter:
90/// 2. Adds a **recovery projection** above to hide the extra column:
91///
92/// ```text
93/// Projection: id, user                                                        <-- recovery projection
94///   Filter: __datafusion_extracted_1 = 'active'
95///     Projection: user['status'] AS __datafusion_extracted_1, id, user         <-- extraction projection
96///       TableScan: t [id, user]
97/// ```
98///
99/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule
100/// and will not push filters through them. It uses `ExpressionPlacement` to detect
101/// `MoveTowardsLeafNodes` expressions and skip filter pushdown past them.
102#[derive(Default, Debug)]
103pub struct ExtractLeafExpressions {}
104
105impl ExtractLeafExpressions {
106    /// Create a new [`ExtractLeafExpressions`]
107    pub fn new() -> Self {
108        Self {}
109    }
110}
111
112impl OptimizerRule for ExtractLeafExpressions {
113    fn name(&self) -> &str {
114        "extract_leaf_expressions"
115    }
116
117    fn rewrite(
118        &self,
119        plan: LogicalPlan,
120        config: &dyn OptimizerConfig,
121    ) -> Result<Transformed<LogicalPlan>> {
122        if !config.options().optimizer.enable_leaf_expression_pushdown {
123            return Ok(Transformed::no(plan));
124        }
125        let alias_generator = config.alias_generator();
126
127        // Advance the alias generator past any user-provided __datafusion_extracted_N
128        // aliases to prevent collisions when generating new extraction aliases.
129        advance_generator_past_existing(&plan, alias_generator)?;
130
131        plan.transform_down_with_subqueries(|plan| {
132            extract_from_plan(plan, alias_generator)
133        })
134    }
135}
136
137/// Scans the current plan node's expressions for pre-existing
138/// `__datafusion_extracted_N` aliases and advances the generator
139/// counter past them to avoid collisions with user-provided aliases.
140fn advance_generator_past_existing(
141    plan: &LogicalPlan,
142    alias_generator: &AliasGenerator,
143) -> Result<()> {
144    plan.apply(|plan| {
145        plan.expressions().iter().try_for_each(|expr| {
146            expr.apply(|e| {
147                if let Expr::Alias(alias) = e
148                    && let Some(id) = alias
149                        .name
150                        .strip_prefix(EXTRACTED_EXPR_PREFIX)
151                        .and_then(|s| s.strip_prefix('_'))
152                        .and_then(|s| s.parse().ok())
153                {
154                    alias_generator.update_min_id(id);
155                }
156                Ok(TreeNodeRecursion::Continue)
157            })?;
158            Ok::<(), datafusion_common::error::DataFusionError>(())
159        })?;
160        Ok(TreeNodeRecursion::Continue)
161    })
162    .map(|_| ())
163}
164
165/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
166///
167/// Works for any number of inputs (0, 1, 2, …N). For multi-input nodes
168/// like Join, each extracted sub-expression is routed to the correct input
169/// by checking which input's schema contains all of the expression's column
170/// references.
171fn extract_from_plan(
172    plan: LogicalPlan,
173    alias_generator: &Arc<AliasGenerator>,
174) -> Result<Transformed<LogicalPlan>> {
175    // Only extract from plan types whose output schema is predictable after
176    // expression rewriting.  Nodes like Window derive column names from
177    // their expressions, so rewriting `get_field` inside a window function
178    // changes the output schema and breaks the recovery projection.
179    if !matches!(
180        &plan,
181        LogicalPlan::Aggregate(_)
182            | LogicalPlan::Filter(_)
183            | LogicalPlan::Sort(_)
184            | LogicalPlan::Limit(_)
185            | LogicalPlan::Join(_)
186    ) {
187        return Ok(Transformed::no(plan));
188    }
189
190    let inputs = plan.inputs();
191    if inputs.is_empty() {
192        return Ok(Transformed::no(plan));
193    }
194
195    // Fast pre-check: skip all allocations if no extractable expressions exist
196    if !has_extractable_expr(&plan.expressions()) {
197        return Ok(Transformed::no(plan));
198    }
199
200    // Save original output schema before any transformation
201    let original_schema = Arc::clone(plan.schema());
202
203    // Build per-input schemas from borrowed inputs (before plan is consumed
204    // by map_expressions). We only need schemas and column sets for routing;
205    // the actual inputs are cloned later only if extraction succeeds.
206    let input_schemas: Vec<Arc<DFSchema>> =
207        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
208
209    // Build per-input extractors
210    let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
211        .iter()
212        .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), alias_generator))
213        .collect();
214
215    // Build per-input column sets for routing expressions to the correct input
216    let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
217        input_schemas
218            .iter()
219            .map(|schema| schema_columns(schema.as_ref()))
220            .collect();
221
222    // Transform expressions via map_expressions with routing
223    let transformed = plan.map_expressions(|expr| {
224        routing_extract(expr, &mut extractors, &input_column_sets)
225    })?;
226
227    // If no expressions were rewritten, nothing was extracted
228    if !transformed.transformed {
229        return Ok(transformed);
230    }
231
232    // Clone inputs now that we know extraction succeeded. Wrap in Arc
233    // upfront since build_extraction_projection expects &Arc<LogicalPlan>.
234    let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
235        .data
236        .inputs()
237        .into_iter()
238        .map(|i| Arc::new(i.clone()))
239        .collect();
240
241    // Build per-input extraction projections (None means no extractions for that input)
242    let new_inputs: Vec<LogicalPlan> = owned_inputs
243        .into_iter()
244        .zip(extractors.iter())
245        .map(|(input_arc, extractor)| {
246            match extractor.build_extraction_projection(&input_arc)? {
247                Some(plan) => Ok(plan),
248                // No extractions for this input — recover the LogicalPlan
249                // without cloning (refcount is 1 since build returned None).
250                None => Ok(Arc::unwrap_or_clone(input_arc)),
251            }
252        })
253        .collect::<Result<Vec<_>>>()?;
254
255    // Rebuild the plan keeping its rewritten expressions but replacing
256    // inputs with the new extraction projections.
257    let new_plan = transformed
258        .data
259        .with_new_exprs(transformed.data.expressions(), new_inputs)?;
260
261    // Add recovery projection if the output schema changed
262    let recovered = build_recovery_projection(original_schema.as_ref(), new_plan)?;
263
264    Ok(Transformed::yes(recovered))
265}
266
267/// Given an expression, returns the index of the input whose columns fully
268/// cover the expression's column references.
269/// Returns `None` if the expression references columns from multiple inputs
270/// or if multiple inputs match (ambiguous, e.g. unqualified columns present
271/// in both sides of a join).
272fn find_owning_input(
273    expr: &Expr,
274    input_column_sets: &[std::collections::HashSet<ColumnReference>],
275) -> Option<usize> {
276    let mut found = None;
277    for (idx, cols) in input_column_sets.iter().enumerate() {
278        if has_all_column_refs(expr, cols) {
279            if found.is_some() {
280                // Ambiguous — multiple inputs match
281                return None;
282            }
283            found = Some(idx);
284        }
285    }
286    found
287}
288
289/// Walks an expression tree top-down, extracting `MoveTowardsLeafNodes`
290/// sub-expressions and routing each to the correct per-input extractor.
291fn routing_extract(
292    expr: Expr,
293    extractors: &mut [LeafExpressionExtractor],
294    input_column_sets: &[std::collections::HashSet<ColumnReference>],
295) -> Result<Transformed<Expr>> {
296    expr.transform_down(|e| {
297        // Skip expressions already aliased with extracted expression pattern
298        if let Expr::Alias(alias) = &e
299            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
300        {
301            return Ok(Transformed {
302                data: e,
303                transformed: false,
304                tnr: TreeNodeRecursion::Jump,
305            });
306        }
307
308        // Don't extract Alias nodes directly — preserve the alias and let
309        // transform_down recurse into the inner expression
310        if matches!(&e, Expr::Alias(_)) {
311            return Ok(Transformed::no(e));
312        }
313
314        match e.placement() {
315            ExpressionPlacement::MoveTowardsLeafNodes => {
316                if let Some(idx) = find_owning_input(&e, input_column_sets) {
317                    let col_ref = extractors[idx].add_extracted(e)?;
318                    Ok(Transformed::yes(col_ref))
319                } else {
320                    // References columns from multiple inputs — cannot extract
321                    Ok(Transformed::no(e))
322                }
323            }
324            ExpressionPlacement::Column => {
325                // Track columns that the parent node references so the
326                // extraction projection includes them as pass-through.
327                // Without this, the extraction projection would only
328                // contain __datafusion_extracted_N aliases, and the parent couldn't
329                // resolve its other column references.
330                if let Expr::Column(col) = &e
331                    && let Some(idx) = find_owning_input(&e, input_column_sets)
332                {
333                    extractors[idx].columns_needed.insert(col.clone());
334                }
335                Ok(Transformed::no(e))
336            }
337            _ => Ok(Transformed::no(e)),
338        }
339    })
340}
341
342/// Rewrites extraction pairs and column references from one qualifier
343/// space to another.
344///
345/// Builds a replacement map by zipping `from_schema` (whose qualifiers
346/// currently appear in `pairs` / `columns`) with `to_schema` (the
347/// qualifiers we want), then applies `replace_cols_by_name`.
348///
349/// Used for SubqueryAlias (alias-space -> input-space) and Union
350/// (union output-space -> per-branch input-space).
351fn remap_pairs_and_columns(
352    pairs: &[(Expr, String)],
353    columns: &IndexSet<Column>,
354    from_schema: &DFSchema,
355    to_schema: &DFSchema,
356) -> Result<ExtractionTarget> {
357    let mut replace_map = HashMap::new();
358    for ((from_q, from_f), (to_q, to_f)) in from_schema.iter().zip(to_schema.iter()) {
359        replace_map.insert(
360            qualified_name(from_q, from_f.name()),
361            Expr::Column(Column::new(to_q.cloned(), to_f.name())),
362        );
363    }
364    let remapped_pairs: Vec<(Expr, String)> = pairs
365        .iter()
366        .map(|(expr, alias)| {
367            Ok((
368                replace_cols_by_name(expr.clone(), &replace_map)?,
369                alias.clone(),
370            ))
371        })
372        .collect::<Result<_>>()?;
373    let remapped_columns: IndexSet<Column> = columns
374        .iter()
375        .filter_map(|col| {
376            let rewritten =
377                replace_cols_by_name(Expr::Column(col.clone()), &replace_map).ok()?;
378            if let Expr::Column(c) = rewritten {
379                Some(c)
380            } else {
381                Some(col.clone())
382            }
383        })
384        .collect();
385    Ok(ExtractionTarget {
386        pairs: remapped_pairs,
387        columns: remapped_columns,
388    })
389}
390
391// =============================================================================
392// Helper Types & Functions for Extraction Targeting
393// =============================================================================
394
395/// A bundle of extraction pairs (expression + alias) and standalone columns
396/// that need to be pushed through a plan node.
397struct ExtractionTarget {
398    /// Extracted expressions paired with their generated aliases.
399    pairs: Vec<(Expr, String)>,
400    /// Standalone column references needed by the parent node.
401    columns: IndexSet<Column>,
402}
403
404/// Build a replacement map from a projection: output_column_name -> underlying_expr.
405///
406/// This is used to resolve column references through a renaming projection.
407/// For example, if a projection has `user AS x`, this maps `x` -> `col("user")`.
408fn build_projection_replace_map(projection: &Projection) -> HashMap<String, Expr> {
409    projection
410        .schema
411        .iter()
412        .zip(projection.expr.iter())
413        .map(|((qualifier, field), expr)| {
414            let key = Column::from((qualifier, field)).flat_name();
415            (key, expr.clone().unalias())
416        })
417        .collect()
418}
419
420/// Build a recovery projection to restore the original output schema.
421///
422/// After extraction, a node's output schema may differ from the original:
423///
424/// - **Schema-preserving nodes** (Filter/Sort/Limit): the extraction projection
425///   below adds extra `__datafusion_extracted_N` columns that bubble up through
426///   the node. Recovery selects only the original columns to hide the extras.
427///   ```text
428///   Original schema: [id, user]
429///   After extraction: [__datafusion_extracted_1, id, user]   ← extra column leaked through
430///   Recovery: SELECT id, user FROM ...                       ← hides __datafusion_extracted_1
431///   ```
432///
433/// - **Schema-defining nodes** (Aggregate): same number of columns but names
434///   may differ because extracted aliases replaced the original expressions.
435///   Recovery maps positionally, aliasing where names changed.
436///   ```text
437///   Original: [SUM(user['balance'])]
438///   After:    [SUM(__datafusion_extracted_1)]                ← name changed
439///   Recovery: SUM(__datafusion_extracted_1) AS "SUM(user['balance'])"
440///   ```
441///
442/// - **Schemas identical** → no recovery projection needed.
443fn build_recovery_projection(
444    original_schema: &DFSchema,
445    input: LogicalPlan,
446) -> Result<LogicalPlan> {
447    let new_schema = input.schema();
448    let orig_len = original_schema.fields().len();
449    let new_len = new_schema.fields().len();
450
451    if orig_len == new_len {
452        // Same number of fields — check if schemas are identical
453        let schemas_match = original_schema.iter().zip(new_schema.iter()).all(
454            |((orig_q, orig_f), (new_q, new_f))| {
455                orig_f.name() == new_f.name() && orig_q == new_q
456            },
457        );
458        if schemas_match {
459            return Ok(input);
460        }
461
462        // Schema-defining nodes (Aggregate, Join): names may differ at some
463        // positions because extracted aliases replaced the original expressions.
464        // Map positionally, aliasing where the name changed.
465        //
466        // Invariant: `with_new_exprs` on all supported node types (Aggregate,
467        // Filter, Sort, Limit, Join) preserves column order, so positional
468        // mapping is safe here.
469        debug_assert!(
470            orig_len == new_len,
471            "build_recovery_projection: positional mapping requires same field count, \
472             got original={orig_len} vs new={new_len}"
473        );
474        let mut proj_exprs = Vec::with_capacity(orig_len);
475        for (i, (orig_qualifier, orig_field)) in original_schema.iter().enumerate() {
476            let (new_qualifier, new_field) = new_schema.qualified_field(i);
477            if orig_field.name() == new_field.name() && orig_qualifier == new_qualifier {
478                proj_exprs.push(Expr::from((orig_qualifier, orig_field)));
479            } else {
480                let new_col = Expr::Column(Column::from((new_qualifier, new_field)));
481                proj_exprs.push(
482                    new_col.alias_qualified(orig_qualifier.cloned(), orig_field.name()),
483                );
484            }
485        }
486        let projection = Projection::try_new(proj_exprs, Arc::new(input))?;
487        Ok(LogicalPlan::Projection(projection))
488    } else {
489        // Schema-preserving nodes: new schema has extra extraction columns.
490        // Original columns still exist by name; select them to hide extras.
491        let col_exprs: Vec<Expr> = original_schema.iter().map(Expr::from).collect();
492        let projection = Projection::try_new(col_exprs, Arc::new(input))?;
493        Ok(LogicalPlan::Projection(projection))
494    }
495}
496
497/// Collects `MoveTowardsLeafNodes` sub-expressions found during expression
498/// tree traversal and can build an extraction projection from them.
499///
500/// # Example
501///
502/// Given `Filter: user['status'] = 'active' AND user['name'] IS NOT NULL`:
503/// - `add_extracted(user['status'])` → stores it, returns `col("__datafusion_extracted_1")`
504/// - `add_extracted(user['name'])`   → stores it, returns `col("__datafusion_extracted_2")`
505/// - `build_extraction_projection()` produces:
506///   `Projection: user['status'] AS __datafusion_extracted_1, user['name'] AS __datafusion_extracted_2, <all input columns>`
507struct LeafExpressionExtractor<'a> {
508    /// Extracted expressions: maps expression -> alias
509    extracted: IndexMap<Expr, String>,
510    /// Columns referenced by extracted expressions or the parent node,
511    /// included as pass-through in the extraction projection.
512    columns_needed: IndexSet<Column>,
513    /// Input schema
514    input_schema: &'a DFSchema,
515    /// Alias generator
516    alias_generator: &'a Arc<AliasGenerator>,
517}
518
519impl<'a> LeafExpressionExtractor<'a> {
520    fn new(input_schema: &'a DFSchema, alias_generator: &'a Arc<AliasGenerator>) -> Self {
521        Self {
522            extracted: IndexMap::new(),
523            columns_needed: IndexSet::new(),
524            input_schema,
525            alias_generator,
526        }
527    }
528
529    /// Adds an expression to extracted set, returns column reference.
530    fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
531        // Deduplication: reuse existing alias if same expression
532        if let Some(alias) = self.extracted.get(&expr) {
533            return Ok(Expr::Column(Column::new_unqualified(alias)));
534        }
535
536        // Track columns referenced by this expression
537        for col in expr.column_refs() {
538            self.columns_needed.insert(col.clone());
539        }
540
541        // Generate unique alias
542        let alias = self.alias_generator.next(EXTRACTED_EXPR_PREFIX);
543        self.extracted.insert(expr, alias.clone());
544
545        Ok(Expr::Column(Column::new_unqualified(&alias)))
546    }
547
548    /// Builds an extraction projection above the given input, or merges into
549    /// it if the input is already a projection. Delegates to
550    /// [`build_extraction_projection_impl`].
551    ///
552    /// Returns `None` if there are no extractions.
553    fn build_extraction_projection(
554        &self,
555        input: &Arc<LogicalPlan>,
556    ) -> Result<Option<LogicalPlan>> {
557        if self.extracted.is_empty() {
558            return Ok(None);
559        }
560        let pairs: Vec<(Expr, String)> = self
561            .extracted
562            .iter()
563            .map(|(e, a)| (e.clone(), a.clone()))
564            .collect();
565        let proj = build_extraction_projection_impl(
566            &pairs,
567            &self.columns_needed,
568            input,
569            self.input_schema,
570        )?;
571        Ok(Some(LogicalPlan::Projection(proj)))
572    }
573}
574
575/// Build an extraction projection above the target node (shared by both passes).
576///
577/// If the target is an existing projection, merges into it. This requires
578/// resolving column references through the projection's rename mapping:
579/// if the projection has `user AS u`, and an extracted expression references
580/// `u['name']`, we must rewrite it to `user['name']` since the merged
581/// projection reads from the same input as the original.
582///
583/// Deduplicates by resolved expression equality and adds pass-through
584/// columns as needed. Otherwise builds a fresh projection with extracted
585/// expressions + ALL input schema columns.
586fn build_extraction_projection_impl(
587    extracted_exprs: &[(Expr, String)],
588    columns_needed: &IndexSet<Column>,
589    target: &Arc<LogicalPlan>,
590    target_schema: &DFSchema,
591) -> Result<Projection> {
592    if let LogicalPlan::Projection(existing) = target.as_ref() {
593        // Merge into existing projection
594        let mut proj_exprs = existing.expr.clone();
595
596        // Build a map of existing expressions (by Expr equality) to their aliases
597        let existing_extractions: IndexMap<Expr, String> = existing
598            .expr
599            .iter()
600            .filter_map(|e| {
601                if let Expr::Alias(alias) = e
602                    && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
603                {
604                    return Some((*alias.expr.clone(), alias.name.clone()));
605                }
606                None
607            })
608            .collect();
609
610        // Resolve column references through the projection's rename mapping
611        let replace_map = build_projection_replace_map(existing);
612
613        // Add new extracted expressions, resolving column refs through the projection
614        for (expr, alias) in extracted_exprs {
615            let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?;
616            let resolved_inner = if let Expr::Alias(a) = &resolved {
617                a.expr.as_ref()
618            } else {
619                &resolved
620            };
621            if let Some(existing_alias) = existing_extractions.get(resolved_inner) {
622                // Same expression already extracted under a different alias —
623                // add the expression with the new alias so both names are
624                // available in the output. We can't reference the existing alias
625                // as a column within the same projection, so we duplicate the
626                // computation.
627                if existing_alias != alias {
628                    proj_exprs.push(resolved);
629                }
630            } else {
631                proj_exprs.push(resolved);
632            }
633        }
634
635        // Add any new pass-through columns that aren't already in the projection.
636        // We check against existing.input.schema() (the projection's source) rather
637        // than target_schema (the projection's output) because columns produced
638        // by alias expressions (e.g., CSE's __common_expr_N) exist in the output but
639        // not the input, and cannot be added as pass-through Column references.
640        let existing_cols: IndexSet<Column> = existing
641            .expr
642            .iter()
643            .filter_map(|e| {
644                if let Expr::Column(c) = e {
645                    Some(c.clone())
646                } else {
647                    None
648                }
649            })
650            .collect();
651
652        let input_schema = existing.input.schema();
653        for col in columns_needed {
654            let col_expr = Expr::Column(col.clone());
655            let resolved = replace_cols_by_name(col_expr, &replace_map)?;
656            if let Expr::Column(resolved_col) = &resolved
657                && !existing_cols.contains(resolved_col)
658                && input_schema.has_column(resolved_col)
659            {
660                proj_exprs.push(Expr::Column(resolved_col.clone()));
661            }
662            // If resolved to non-column expr, it's already computed by existing projection
663        }
664
665        Projection::try_new(proj_exprs, Arc::clone(&existing.input))
666    } else {
667        // Build new projection with extracted expressions + all input columns
668        let mut proj_exprs = Vec::new();
669        for (expr, alias) in extracted_exprs {
670            proj_exprs.push(expr.clone().alias(alias));
671        }
672        for (qualifier, field) in target_schema.iter() {
673            proj_exprs.push(Expr::from((qualifier, field)));
674        }
675        Projection::try_new(proj_exprs, Arc::clone(target))
676    }
677}
678
679// =============================================================================
680// Pass 2: PushDownLeafProjections
681// =============================================================================
682
683/// Pushes extraction projections down through schema-preserving nodes towards
684/// leaf nodes (pass 2 of 2, after [`ExtractLeafExpressions`]).
685///
686/// Handles two types of projections:
687/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + columns):
688///   pushes through Filter/Sort/Limit, merges into existing projections, or routes
689///   into multi-input node inputs (Join, SubqueryAlias, etc.)
690/// - **Mixed projections** (user projections containing `MoveTowardsLeafNodes`
691///   sub-expressions): splits into a recovery projection + extraction projection,
692///   then pushes the extraction projection down.
693///
694/// # Example: Pushing through a Filter
695///
696/// After pass 1, the extraction projection sits directly below the filter:
697/// ```text
698/// Projection: id, user                                                              <-- recovery
699///   Filter: __datafusion_extracted_1 = 'active'
700///     Projection: user['status'] AS __datafusion_extracted_1, id, user               <-- extraction
701///       TableScan: t [id, user]
702/// ```
703///
704/// Pass 2 pushes the extraction projection through the recovery and filter,
705/// and a subsequent `OptimizeProjections` pass removes the (now-redundant)
706/// recovery projection:
707/// ```text
708/// Filter: __datafusion_extracted_1 = 'active'
709///   Projection: user['status'] AS __datafusion_extracted_1, id, user                 <-- extraction (pushed down)
710///     TableScan: t [id, user]
711/// ```
712#[derive(Default, Debug)]
713pub struct PushDownLeafProjections {}
714
715impl PushDownLeafProjections {
716    pub fn new() -> Self {
717        Self {}
718    }
719}
720
721impl OptimizerRule for PushDownLeafProjections {
722    fn name(&self) -> &str {
723        "push_down_leaf_projections"
724    }
725
726    fn apply_order(&self) -> Option<ApplyOrder> {
727        Some(ApplyOrder::TopDown)
728    }
729
730    fn rewrite(
731        &self,
732        plan: LogicalPlan,
733        config: &dyn OptimizerConfig,
734    ) -> Result<Transformed<LogicalPlan>> {
735        if !config.options().optimizer.enable_leaf_expression_pushdown {
736            return Ok(Transformed::no(plan));
737        }
738        let alias_generator = config.alias_generator();
739        match try_push_input(&plan, alias_generator)? {
740            Some(new_plan) => Ok(Transformed::yes(new_plan)),
741            None => Ok(Transformed::no(plan)),
742        }
743    }
744}
745
746/// Attempts to push a projection's extractable expressions further down.
747///
748/// Returns `Some(new_subtree)` if the projection was pushed down or merged,
749/// `None` if there is nothing to push or the projection sits above a barrier.
750fn try_push_input(
751    input: &LogicalPlan,
752    alias_generator: &Arc<AliasGenerator>,
753) -> Result<Option<LogicalPlan>> {
754    let LogicalPlan::Projection(proj) = input else {
755        return Ok(None);
756    };
757    split_and_push_projection(proj, alias_generator)
758}
759
760/// Splits a projection into extractable pieces, pushes them towards leaf
761/// nodes, and adds a recovery projection if needed.
762///
763/// Handles both:
764/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + columns)
765/// - **Mixed projections** (containing `MoveTowardsLeafNodes` sub-expressions)
766///
767/// Returns `Some(new_subtree)` if extractions were pushed down,
768/// `None` if there is nothing to extract or push.
769///
770/// # Example: Mixed Projection
771///
772/// ```text
773/// Input plan:
774///   Projection: user['name'] IS NOT NULL AS has_name, id
775///     Filter: ...
776///       TableScan
777///
778/// Phase 1 (Split):
779///   extraction_pairs: [(user['name'], "__datafusion_extracted_1")]
780///   recovery_exprs:   [__datafusion_extracted_1 IS NOT NULL AS has_name, id]
781///
782/// Phase 2 (Push):
783///   Push extraction projection through Filter toward TableScan
784///
785/// Phase 3 (Recovery):
786///   Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, id       <-- recovery
787///     Filter: ...
788///       Projection: user['name'] AS __datafusion_extracted_1, id           <-- extraction (pushed)
789///         TableScan
790/// ```
791fn split_and_push_projection(
792    proj: &Projection,
793    alias_generator: &Arc<AliasGenerator>,
794) -> Result<Option<LogicalPlan>> {
795    // Fast pre-check: skip if there are no pre-existing extracted aliases
796    // and no new extractable expressions.
797    let has_existing_extracted = proj.expr.iter().any(|e| {
798        matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
799    });
800    if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
801        return Ok(None);
802    }
803
804    let input = &proj.input;
805    let input_schema = input.schema();
806
807    // ── Phase 1: Split ──────────────────────────────────────────────────
808    // For each projection expression, collect extraction pairs and build
809    // recovery expressions.
810    //
811    // Pre-existing `__datafusion_extracted` aliases are inserted into the
812    // extractor's `IndexMap` with the **full** `Expr::Alias(…)` as the key,
813    // so the alias name participates in equality. This prevents collisions
814    // when CSE rewrites produce the same inner expression under different
815    // alias names (e.g. `__common_expr_4 AS __datafusion_extracted_1` and
816    // `__common_expr_4 AS __datafusion_extracted_3`). New extractions from
817    // `routing_extract` use bare (non-Alias) keys and get normal dedup.
818    //
819    // When building the final `extraction_pairs`, the Alias wrapper is
820    // stripped so consumers see the usual `(inner_expr, alias_name)` tuples.
821
822    let mut extractors = vec![LeafExpressionExtractor::new(
823        input_schema.as_ref(),
824        alias_generator,
825    )];
826    let input_column_sets = vec![schema_columns(input_schema.as_ref())];
827
828    let original_schema = proj.schema.as_ref();
829    let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
830    let mut needs_recovery = false;
831    let mut has_new_extractions = false;
832    let mut proj_exprs_captured: usize = 0;
833    // Track standalone column expressions (Case B) to detect column refs
834    // from extracted aliases (Case A) that aren't also standalone expressions.
835    let mut standalone_columns: IndexSet<Column> = IndexSet::new();
836
837    for (expr, (qualifier, field)) in proj.expr.iter().zip(original_schema.iter()) {
838        if let Expr::Alias(alias) = expr
839            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
840        {
841            // Insert the full Alias expression as the key so that
842            // distinct alias names don't collide in the IndexMap.
843            let alias_name = alias.name.clone();
844
845            for col_ref in alias.expr.column_refs() {
846                extractors[0].columns_needed.insert(col_ref.clone());
847            }
848
849            extractors[0]
850                .extracted
851                .insert(expr.clone(), alias_name.clone());
852            recovery_exprs.push(Expr::Column(Column::new_unqualified(&alias_name)));
853            proj_exprs_captured += 1;
854        } else if let Expr::Column(col) = expr {
855            // Plain column pass-through — track it in the extractor
856            extractors[0].columns_needed.insert(col.clone());
857            standalone_columns.insert(col.clone());
858            recovery_exprs.push(expr.clone());
859            proj_exprs_captured += 1;
860        } else {
861            // Everything else: run through routing_extract
862            let transformed =
863                routing_extract(expr.clone(), &mut extractors, &input_column_sets)?;
864            if transformed.transformed {
865                has_new_extractions = true;
866            }
867            let transformed_expr = transformed.data;
868
869            // Build recovery expression, aliasing back to original name if needed
870            let original_name = field.name();
871            let needs_alias = if let Expr::Column(col) = &transformed_expr {
872                col.name.as_str() != original_name
873            } else {
874                let expr_name = transformed_expr.schema_name().to_string();
875                original_name != &expr_name
876            };
877            let recovery_expr = if needs_alias {
878                needs_recovery = true;
879                transformed_expr
880                    .clone()
881                    .alias_qualified(qualifier.cloned(), original_name)
882            } else {
883                transformed_expr.clone()
884            };
885
886            // If the expression was transformed (i.e., has extracted sub-parts),
887            // it differs from what the pushed projection outputs → needs recovery.
888            // Also, any non-column, non-__datafusion_extracted expression needs recovery
889            // because the pushed extraction projection won't output it directly.
890            if transformed.transformed || !matches!(expr, Expr::Column(_)) {
891                needs_recovery = true;
892            }
893
894            recovery_exprs.push(recovery_expr);
895        }
896    }
897
898    // Build extraction_pairs, stripping the Alias wrapper from pre-existing
899    // entries (they used the full Alias as the map key to avoid dedup).
900    let extractor = &extractors[0];
901    let extraction_pairs: Vec<(Expr, String)> = extractor
902        .extracted
903        .iter()
904        .map(|(e, a)| match e {
905            Expr::Alias(alias) => (*alias.expr.clone(), a.clone()),
906            _ => (e.clone(), a.clone()),
907        })
908        .collect();
909    let columns_needed = &extractor.columns_needed;
910
911    // If no extractions found, nothing to do
912    if extraction_pairs.is_empty() {
913        return Ok(None);
914    }
915
916    // If columns_needed has entries that aren't standalone projection columns
917    // (i.e., they came from column refs inside extracted aliases), a merge
918    // into an inner projection will widen the schema with those extra columns,
919    // requiring a recovery projection to restore the original schema.
920    if columns_needed
921        .iter()
922        .any(|c| !standalone_columns.contains(c))
923    {
924        needs_recovery = true;
925    }
926
927    // ── Phase 2: Push down ──────────────────────────────────────────────
928    let proj_input = Arc::clone(&proj.input);
929    let pushed = push_extraction_pairs(
930        &extraction_pairs,
931        columns_needed,
932        proj,
933        &proj_input,
934        alias_generator,
935        proj_exprs_captured,
936    )?;
937
938    // ── Phase 3: Recovery ───────────────────────────────────────────────
939    // Determine the base plan: either the pushed result or an in-place extraction.
940    let base_plan = match pushed {
941        Some(plan) => plan,
942        None => {
943            if !has_new_extractions {
944                // Only pre-existing __datafusion_extracted aliases and columns, no new
945                // extractions from routing_extract. The original projection is
946                // already an extraction projection that couldn't be pushed
947                // further. Return None.
948                return Ok(None);
949            }
950            // Build extraction projection in-place (couldn't push down)
951            let input_arc = Arc::clone(input);
952            let extraction = build_extraction_projection_impl(
953                &extraction_pairs,
954                columns_needed,
955                &input_arc,
956                input_schema.as_ref(),
957            )?;
958            LogicalPlan::Projection(extraction)
959        }
960    };
961
962    // Wrap with recovery projection if the output schema changed
963    if needs_recovery {
964        let recovery = LogicalPlan::Projection(Projection::try_new(
965            recovery_exprs,
966            Arc::new(base_plan),
967        )?);
968        Ok(Some(recovery))
969    } else {
970        Ok(Some(base_plan))
971    }
972}
973
974/// Returns true if the plan is a Projection where ALL expressions are either
975/// `Alias(EXTRACTED_EXPR_PREFIX, ...)` or `Column`, with at least one extraction.
976/// Such projections can safely be pushed further without re-extraction.
977fn is_pure_extraction_projection(plan: &LogicalPlan) -> bool {
978    let LogicalPlan::Projection(proj) = plan else {
979        return false;
980    };
981    let mut has_extraction = false;
982    for expr in &proj.expr {
983        match expr {
984            Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX) => {
985                has_extraction = true;
986            }
987            Expr::Column(_) => {}
988            _ => return false,
989        }
990    }
991    has_extraction
992}
993
994/// Pushes extraction pairs down through the projection's input node,
995/// dispatching to the appropriate handler based on the input node type.
996fn push_extraction_pairs(
997    pairs: &[(Expr, String)],
998    columns_needed: &IndexSet<Column>,
999    proj: &Projection,
1000    proj_input: &Arc<LogicalPlan>,
1001    alias_generator: &Arc<AliasGenerator>,
1002    proj_exprs_captured: usize,
1003) -> Result<Option<LogicalPlan>> {
1004    match proj_input.as_ref() {
1005        // Merge into existing projection, then try to push the result further down.
1006        // Only merge when every expression in the outer projection is fully
1007        // captured as either an extraction pair (Case A: __datafusion_extracted
1008        // alias) or a plain column (Case B). Uncaptured expressions (e.g.
1009        // `col AS __common_expr_1` from CSE, or complex expressions with
1010        // extracted sub-parts) would be lost during the merge.
1011        LogicalPlan::Projection(_) if proj_exprs_captured == proj.expr.len() => {
1012            let target_schema = Arc::clone(proj_input.schema());
1013            let merged = build_extraction_projection_impl(
1014                pairs,
1015                columns_needed,
1016                proj_input,
1017                target_schema.as_ref(),
1018            )?;
1019            let merged_plan = LogicalPlan::Projection(merged);
1020
1021            // After merging, try to push the result further down, but ONLY
1022            // if the merged result is still a pure extraction projection
1023            // (all __datafusion_extracted aliases + columns). If the merge inherited
1024            // bare MoveTowardsLeafNodes expressions from the inner projection,
1025            // pushing would re-extract them into new aliases and fail when
1026            // the (None, true) fallback can't find the original aliases.
1027            // This handles: Extraction → Recovery(cols) → Filter → ... → TableScan
1028            // by pushing through the recovery projection AND the filter in one pass.
1029            if is_pure_extraction_projection(&merged_plan)
1030                && let Some(pushed) = try_push_input(&merged_plan, alias_generator)?
1031            {
1032                return Ok(Some(pushed));
1033            }
1034            Ok(Some(merged_plan))
1035        }
1036        // Generic: handles Filter/Sort/Limit (via recursion),
1037        // SubqueryAlias (with qualifier remap in try_push_into_inputs),
1038        // Join, and anything else.
1039        // Safely bails out for nodes that don't pass through extracted
1040        // columns (Aggregate, Window) via the output schema check.
1041        _ => try_push_into_inputs(
1042            pairs,
1043            columns_needed,
1044            proj_input.as_ref(),
1045            alias_generator,
1046        ),
1047    }
1048}
1049
1050/// Routes extraction pairs and columns to the appropriate inputs.
1051///
1052/// - **Union**: broadcasts to every input via [`remap_pairs_and_columns`].
1053/// - **Other nodes**: routes each expression to the one input that owns
1054///   all of its column references (via [`find_owning_input`]).
1055///
1056/// Returns `None` if any expression can't be routed or no input has pairs.
1057fn route_to_inputs(
1058    pairs: &[(Expr, String)],
1059    columns: &IndexSet<Column>,
1060    node: &LogicalPlan,
1061    input_column_sets: &[std::collections::HashSet<ColumnReference>],
1062    input_schemas: &[Arc<DFSchema>],
1063) -> Result<Option<Vec<ExtractionTarget>>> {
1064    let num_inputs = input_schemas.len();
1065    let mut per_input: Vec<ExtractionTarget> = (0..num_inputs)
1066        .map(|_| ExtractionTarget {
1067            pairs: vec![],
1068            columns: IndexSet::new(),
1069        })
1070        .collect();
1071
1072    if matches!(node, LogicalPlan::Union(_)) {
1073        // Union output schema and each input schema have the same fields by
1074        // index but may differ in qualifiers (e.g. output `s` vs input
1075        // `simple_struct.s`). Remap pairs/columns to each input's space.
1076        let union_schema = node.schema();
1077        for (idx, input_schema) in input_schemas.iter().enumerate() {
1078            per_input[idx] =
1079                remap_pairs_and_columns(pairs, columns, union_schema, input_schema)?;
1080        }
1081    } else {
1082        for (expr, alias) in pairs {
1083            match find_owning_input(expr, input_column_sets) {
1084                Some(idx) => per_input[idx].pairs.push((expr.clone(), alias.clone())),
1085                None => return Ok(None), // Cross-input expression — bail out
1086            }
1087        }
1088        for col in columns {
1089            let col_expr = Expr::Column(col.clone());
1090            match find_owning_input(&col_expr, input_column_sets) {
1091                Some(idx) => {
1092                    per_input[idx].columns.insert(col.clone());
1093                }
1094                None => return Ok(None), // Ambiguous column — bail out
1095            }
1096        }
1097    }
1098
1099    // Check at least one input has extractions to push
1100    if per_input.iter().all(|t| t.pairs.is_empty()) {
1101        return Ok(None);
1102    }
1103
1104    Ok(Some(per_input))
1105}
1106
1107/// Pushes extraction expressions into a node's inputs by routing each
1108/// expression to the input that owns all of its column references.
1109///
1110/// Works for any number of inputs (1, 2, …N). For single-input nodes,
1111/// all expressions trivially route to that input. For multi-input nodes
1112/// (Join, etc.), each expression is routed to the side that owns its columns.
1113///
1114/// Returns `Some(new_node)` if all expressions could be routed AND the
1115/// rebuilt node's output schema contains all extracted aliases.
1116/// Returns `None` if any expression references columns from multiple inputs
1117/// or the node doesn't pass through the extracted columns.
1118///
1119/// # Example: Join with expressions from both sides
1120///
1121/// ```text
1122/// Extraction projection above a Join:
1123///   Projection: left.user['name'] AS __datafusion_extracted_1, right.order['total'] AS __datafusion_extracted_2, ...
1124///     Join: left.id = right.user_id
1125///       TableScan: left [id, user]
1126///       TableScan: right [user_id, order]
1127///
1128/// After routing each expression to its owning input:
1129///   Join: left.id = right.user_id
1130///     Projection: user['name'] AS __datafusion_extracted_1, id, user              <-- left-side extraction
1131///       TableScan: left [id, user]
1132///     Projection: order['total'] AS __datafusion_extracted_2, user_id, order      <-- right-side extraction
1133///       TableScan: right [user_id, order]
1134/// ```
1135fn try_push_into_inputs(
1136    pairs: &[(Expr, String)],
1137    columns_needed: &IndexSet<Column>,
1138    node: &LogicalPlan,
1139    alias_generator: &Arc<AliasGenerator>,
1140) -> Result<Option<LogicalPlan>> {
1141    let inputs = node.inputs();
1142    if inputs.is_empty() {
1143        return Ok(None);
1144    }
1145
1146    // SubqueryAlias remaps qualifiers between input and output.
1147    // Rewrite pairs/columns from alias-space to input-space before routing.
1148    let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
1149        remap_pairs_and_columns(pairs, columns_needed, &sa.schema, sa.input.schema())?
1150    } else {
1151        ExtractionTarget {
1152            pairs: pairs.to_vec(),
1153            columns: columns_needed.clone(),
1154        }
1155    };
1156    let pairs = &remapped.pairs[..];
1157    let columns_needed = &remapped.columns;
1158
1159    // Build per-input schemas and column sets for routing
1160    let input_schemas: Vec<Arc<DFSchema>> =
1161        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1162    let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
1163        input_schemas.iter().map(|s| schema_columns(s)).collect();
1164
1165    // Route pairs and columns to the appropriate inputs
1166    let per_input = match route_to_inputs(
1167        pairs,
1168        columns_needed,
1169        node,
1170        &input_column_sets,
1171        &input_schemas,
1172    )? {
1173        Some(routed) => routed,
1174        None => return Ok(None),
1175    };
1176
1177    let num_inputs = inputs.len();
1178
1179    // Build per-input extraction projections and push them as far as possible
1180    // immediately. This is critical because map_children preserves cached schemas,
1181    // so if the TopDown pass later pushes a child further (changing its output
1182    // schema), the parent node's schema becomes stale.
1183    let mut new_inputs: Vec<LogicalPlan> = Vec::with_capacity(num_inputs);
1184    for (idx, input) in inputs.into_iter().enumerate() {
1185        if per_input[idx].pairs.is_empty() {
1186            new_inputs.push(input.clone());
1187        } else {
1188            let input_arc = Arc::new(input.clone());
1189            let target_schema = Arc::clone(input.schema());
1190            let proj = build_extraction_projection_impl(
1191                &per_input[idx].pairs,
1192                &per_input[idx].columns,
1193                &input_arc,
1194                target_schema.as_ref(),
1195            )?;
1196            // Verify all requested aliases appear in the projection's output.
1197            // A merge may deduplicate if the same expression already exists
1198            // under a different alias, leaving the requested alias missing.
1199            let proj_schema = proj.schema.as_ref();
1200            for (_expr, alias) in &per_input[idx].pairs {
1201                if !proj_schema.fields().iter().any(|f| f.name() == alias) {
1202                    return Ok(None);
1203                }
1204            }
1205            let proj_plan = LogicalPlan::Projection(proj);
1206            // Try to push the extraction projection further down within
1207            // this input (e.g., through Filter → existing extraction projection).
1208            // This ensures the input's output schema is stable and won't change
1209            // when the TopDown pass later visits children.
1210            match try_push_input(&proj_plan, alias_generator)? {
1211                Some(pushed) => new_inputs.push(pushed),
1212                None => new_inputs.push(proj_plan),
1213            }
1214        }
1215    }
1216
1217    // Rebuild the node with new inputs
1218    let new_node = node.with_new_exprs(node.expressions(), new_inputs)?;
1219
1220    // Safety check: verify all extracted aliases appear in the rebuilt
1221    // node's output schema. Nodes like Aggregate define their own output
1222    // and won't pass through extracted columns — bail out for those.
1223    let output_schema = new_node.schema();
1224    for (_expr, alias) in pairs {
1225        if !output_schema.fields().iter().any(|f| f.name() == alias) {
1226            return Ok(None);
1227        }
1228    }
1229
1230    Ok(Some(new_node))
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235
1236    use super::*;
1237    use crate::optimize_projections::OptimizeProjections;
1238    use crate::test::udfs::PlacementTestUDF;
1239    use crate::test::*;
1240    use crate::{Optimizer, OptimizerContext};
1241    use datafusion_expr::expr::ScalarFunction;
1242    use datafusion_expr::{
1243        ScalarUDF, col, lit, logical_plan::builder::LogicalPlanBuilder,
1244    };
1245
1246    fn leaf_udf(expr: Expr, name: &str) -> Expr {
1247        Expr::ScalarFunction(ScalarFunction::new_udf(
1248            Arc::new(ScalarUDF::new_from_impl(
1249                PlacementTestUDF::new()
1250                    .with_placement(ExpressionPlacement::MoveTowardsLeafNodes),
1251            )),
1252            vec![expr, lit(name)],
1253        ))
1254    }
1255
1256    // =========================================================================
1257    // Combined optimization stage formatter
1258    // =========================================================================
1259
1260    /// Runs all 4 optimization stages and returns a single formatted string.
1261    /// Stages that produce the same plan as the previous stage show
1262    /// "(same as <previous>)" to reduce noise.
1263    ///
1264    /// Stages:
1265    /// 1. **Original** - OptimizeProjections only (baseline)
1266    /// 2. **After Extraction** - + ExtractLeafExpressions
1267    /// 3. **After Pushdown** - + PushDownLeafProjections
1268    /// 4. **Optimized** - + final OptimizeProjections
1269    fn format_optimization_stages(plan: &LogicalPlan) -> Result<String> {
1270        let run = |rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>| -> Result<String> {
1271            let ctx = OptimizerContext::new().with_max_passes(1);
1272            let optimizer = Optimizer::with_rules(rules);
1273            let optimized = optimizer.optimize(plan.clone(), &ctx, |_, _| {})?;
1274            Ok(format!("{optimized}"))
1275        };
1276
1277        let original = run(vec![Arc::new(OptimizeProjections::new())])?;
1278
1279        let after_extract = run(vec![
1280            Arc::new(OptimizeProjections::new()),
1281            Arc::new(ExtractLeafExpressions::new()),
1282        ])?;
1283
1284        let after_pushdown = run(vec![
1285            Arc::new(OptimizeProjections::new()),
1286            Arc::new(ExtractLeafExpressions::new()),
1287            Arc::new(PushDownLeafProjections::new()),
1288        ])?;
1289
1290        let optimized = run(vec![
1291            Arc::new(OptimizeProjections::new()),
1292            Arc::new(ExtractLeafExpressions::new()),
1293            Arc::new(PushDownLeafProjections::new()),
1294            Arc::new(OptimizeProjections::new()),
1295        ])?;
1296
1297        let mut out = format!("## Original Plan\n{original}");
1298
1299        out.push_str("\n\n## After Extraction\n");
1300        if after_extract == original {
1301            out.push_str("(same as original)");
1302        } else {
1303            out.push_str(&after_extract);
1304        }
1305
1306        out.push_str("\n\n## After Pushdown\n");
1307        if after_pushdown == after_extract {
1308            out.push_str("(same as after extraction)");
1309        } else {
1310            out.push_str(&after_pushdown);
1311        }
1312
1313        out.push_str("\n\n## Optimized\n");
1314        if optimized == after_pushdown {
1315            out.push_str("(same as after pushdown)");
1316        } else {
1317            out.push_str(&optimized);
1318        }
1319
1320        Ok(out)
1321    }
1322
1323    /// Assert all optimization stages for a plan in a single insta snapshot.
1324    macro_rules! assert_stages {
1325        ($plan:expr, @ $expected:literal $(,)?) => {{
1326            let result = format_optimization_stages(&$plan)?;
1327            insta::assert_snapshot!(result, @ $expected);
1328            Ok::<(), datafusion_common::DataFusionError>(())
1329        }};
1330    }
1331
1332    #[test]
1333    fn test_extract_from_filter() -> Result<()> {
1334        let table_scan = test_table_scan_with_struct()?;
1335        let plan = LogicalPlanBuilder::from(table_scan.clone())
1336            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1337            .select(vec![
1338                table_scan
1339                    .schema()
1340                    .index_of_column_by_name(None, "id")
1341                    .unwrap(),
1342            ])?
1343            .build()?;
1344
1345        assert_stages!(plan, @r#"
1346        ## Original Plan
1347        Projection: test.id
1348          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1349            TableScan: test projection=[id, user]
1350
1351        ## After Extraction
1352        Projection: test.id
1353          Projection: test.id, test.user
1354            Filter: __datafusion_extracted_1 = Utf8("active")
1355              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
1356                TableScan: test projection=[id, user]
1357
1358        ## After Pushdown
1359        (same as after extraction)
1360
1361        ## Optimized
1362        Projection: test.id
1363          Filter: __datafusion_extracted_1 = Utf8("active")
1364            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
1365              TableScan: test projection=[id, user]
1366        "#)
1367    }
1368
1369    #[test]
1370    fn test_no_extraction_for_column() -> Result<()> {
1371        let table_scan = test_table_scan()?;
1372        let plan = LogicalPlanBuilder::from(table_scan)
1373            .filter(col("a").eq(lit(1)))?
1374            .build()?;
1375
1376        assert_stages!(plan, @"
1377        ## Original Plan
1378        Filter: test.a = Int32(1)
1379          TableScan: test projection=[a, b, c]
1380
1381        ## After Extraction
1382        (same as original)
1383
1384        ## After Pushdown
1385        (same as after extraction)
1386
1387        ## Optimized
1388        (same as after pushdown)
1389        ")
1390    }
1391
1392    #[test]
1393    fn test_extract_from_projection() -> Result<()> {
1394        let table_scan = test_table_scan_with_struct()?;
1395        let plan = LogicalPlanBuilder::from(table_scan)
1396            .project(vec![leaf_udf(col("user"), "name")])?
1397            .build()?;
1398
1399        assert_stages!(plan, @r#"
1400        ## Original Plan
1401        Projection: leaf_udf(test.user, Utf8("name"))
1402          TableScan: test projection=[user]
1403
1404        ## After Extraction
1405        (same as original)
1406
1407        ## After Pushdown
1408        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1409          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1410            TableScan: test projection=[user]
1411
1412        ## Optimized
1413        Projection: leaf_udf(test.user, Utf8("name"))
1414          TableScan: test projection=[user]
1415        "#)
1416    }
1417
1418    #[test]
1419    fn test_extract_from_projection_with_subexpression() -> Result<()> {
1420        let table_scan = test_table_scan_with_struct()?;
1421        let plan = LogicalPlanBuilder::from(table_scan)
1422            .project(vec![
1423                leaf_udf(col("user"), "name")
1424                    .is_not_null()
1425                    .alias("has_name"),
1426            ])?
1427            .build()?;
1428
1429        assert_stages!(plan, @r#"
1430        ## Original Plan
1431        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1432          TableScan: test projection=[user]
1433
1434        ## After Extraction
1435        (same as original)
1436
1437        ## After Pushdown
1438        Projection: __datafusion_extracted_1 IS NOT NULL AS has_name
1439          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1440            TableScan: test projection=[user]
1441
1442        ## Optimized
1443        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1444          TableScan: test projection=[user]
1445        "#)
1446    }
1447
1448    #[test]
1449    fn test_projection_no_extraction_for_column() -> Result<()> {
1450        let table_scan = test_table_scan()?;
1451        let plan = LogicalPlanBuilder::from(table_scan)
1452            .project(vec![col("a"), col("b")])?
1453            .build()?;
1454
1455        assert_stages!(plan, @"
1456        ## Original Plan
1457        TableScan: test projection=[a, b]
1458
1459        ## After Extraction
1460        (same as original)
1461
1462        ## After Pushdown
1463        (same as after extraction)
1464
1465        ## Optimized
1466        (same as after pushdown)
1467        ")
1468    }
1469
1470    #[test]
1471    fn test_filter_with_deduplication() -> Result<()> {
1472        let table_scan = test_table_scan_with_struct()?;
1473        let field_access = leaf_udf(col("user"), "name");
1474        // Filter with the same expression used twice
1475        let plan = LogicalPlanBuilder::from(table_scan)
1476            .filter(
1477                field_access
1478                    .clone()
1479                    .is_not_null()
1480                    .and(field_access.is_null()),
1481            )?
1482            .build()?;
1483
1484        assert_stages!(plan, @r#"
1485        ## Original Plan
1486        Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL AND leaf_udf(test.user, Utf8("name")) IS NULL
1487          TableScan: test projection=[id, user]
1488
1489        ## After Extraction
1490        Projection: test.id, test.user
1491          Filter: __datafusion_extracted_1 IS NOT NULL AND __datafusion_extracted_1 IS NULL
1492            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1493              TableScan: test projection=[id, user]
1494
1495        ## After Pushdown
1496        (same as after extraction)
1497
1498        ## Optimized
1499        (same as after pushdown)
1500        "#)
1501    }
1502
1503    #[test]
1504    fn test_already_leaf_expression_in_filter() -> Result<()> {
1505        let table_scan = test_table_scan_with_struct()?;
1506        let plan = LogicalPlanBuilder::from(table_scan)
1507            .filter(leaf_udf(col("user"), "name").eq(lit("test")))?
1508            .build()?;
1509
1510        assert_stages!(plan, @r#"
1511        ## Original Plan
1512        Filter: leaf_udf(test.user, Utf8("name")) = Utf8("test")
1513          TableScan: test projection=[id, user]
1514
1515        ## After Extraction
1516        Projection: test.id, test.user
1517          Filter: __datafusion_extracted_1 = Utf8("test")
1518            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1519              TableScan: test projection=[id, user]
1520
1521        ## After Pushdown
1522        (same as after extraction)
1523
1524        ## Optimized
1525        (same as after pushdown)
1526        "#)
1527    }
1528
1529    #[test]
1530    fn test_extract_from_aggregate_group_by() -> Result<()> {
1531        use datafusion_expr::test::function_stub::count;
1532
1533        let table_scan = test_table_scan_with_struct()?;
1534        let plan = LogicalPlanBuilder::from(table_scan)
1535            .aggregate(vec![leaf_udf(col("user"), "status")], vec![count(lit(1))])?
1536            .build()?;
1537
1538        assert_stages!(plan, @r#"
1539        ## Original Plan
1540        Aggregate: groupBy=[[leaf_udf(test.user, Utf8("status"))]], aggr=[[COUNT(Int32(1))]]
1541          TableScan: test projection=[user]
1542
1543        ## After Extraction
1544        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1545          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1546            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1547              TableScan: test projection=[user]
1548
1549        ## After Pushdown
1550        (same as after extraction)
1551
1552        ## Optimized
1553        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1554          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1555            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
1556              TableScan: test projection=[user]
1557        "#)
1558    }
1559
1560    #[test]
1561    fn test_extract_from_aggregate_args() -> Result<()> {
1562        use datafusion_expr::test::function_stub::count;
1563
1564        let table_scan = test_table_scan_with_struct()?;
1565        let plan = LogicalPlanBuilder::from(table_scan)
1566            .aggregate(
1567                vec![col("user")],
1568                vec![count(leaf_udf(col("user"), "value"))],
1569            )?
1570            .build()?;
1571
1572        assert_stages!(plan, @r#"
1573        ## Original Plan
1574        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value")))]]
1575          TableScan: test projection=[user]
1576
1577        ## After Extraction
1578        Projection: test.user, COUNT(__datafusion_extracted_1) AS COUNT(leaf_udf(test.user,Utf8("value")))
1579          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1)]]
1580            Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1581              TableScan: test projection=[user]
1582
1583        ## After Pushdown
1584        (same as after extraction)
1585
1586        ## Optimized
1587        (same as after pushdown)
1588        "#)
1589    }
1590
1591    #[test]
1592    fn test_projection_with_filter_combined() -> Result<()> {
1593        let table_scan = test_table_scan_with_struct()?;
1594        let plan = LogicalPlanBuilder::from(table_scan)
1595            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1596            .project(vec![leaf_udf(col("user"), "name")])?
1597            .build()?;
1598
1599        assert_stages!(plan, @r#"
1600        ## Original Plan
1601        Projection: leaf_udf(test.user, Utf8("name"))
1602          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1603            TableScan: test projection=[user]
1604
1605        ## After Extraction
1606        Projection: leaf_udf(test.user, Utf8("name"))
1607          Projection: test.user
1608            Filter: __datafusion_extracted_1 = Utf8("active")
1609              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1610                TableScan: test projection=[user]
1611
1612        ## After Pushdown
1613        Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1614          Filter: __datafusion_extracted_1 = Utf8("active")
1615            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1616              TableScan: test projection=[user]
1617
1618        ## Optimized
1619        Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1620          Filter: __datafusion_extracted_1 = Utf8("active")
1621            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1622              TableScan: test projection=[user]
1623        "#)
1624    }
1625
1626    #[test]
1627    fn test_projection_preserves_alias() -> Result<()> {
1628        let table_scan = test_table_scan_with_struct()?;
1629        let plan = LogicalPlanBuilder::from(table_scan)
1630            .project(vec![leaf_udf(col("user"), "name").alias("username")])?
1631            .build()?;
1632
1633        assert_stages!(plan, @r#"
1634        ## Original Plan
1635        Projection: leaf_udf(test.user, Utf8("name")) AS username
1636          TableScan: test projection=[user]
1637
1638        ## After Extraction
1639        (same as original)
1640
1641        ## After Pushdown
1642        Projection: __datafusion_extracted_1 AS username
1643          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1644            TableScan: test projection=[user]
1645
1646        ## Optimized
1647        Projection: leaf_udf(test.user, Utf8("name")) AS username
1648          TableScan: test projection=[user]
1649        "#)
1650    }
1651
1652    /// Test: Projection with different field than Filter
1653    /// SELECT id, s['label'] FROM t WHERE s['value'] > 150
1654    /// Both s['label'] and s['value'] should be in a single extraction projection.
1655    #[test]
1656    fn test_projection_different_field_from_filter() -> Result<()> {
1657        let table_scan = test_table_scan_with_struct()?;
1658        let plan = LogicalPlanBuilder::from(table_scan)
1659            .filter(leaf_udf(col("user"), "value").gt(lit(150)))?
1660            .project(vec![col("user"), leaf_udf(col("user"), "label")])?
1661            .build()?;
1662
1663        assert_stages!(plan, @r#"
1664        ## Original Plan
1665        Projection: test.user, leaf_udf(test.user, Utf8("label"))
1666          Filter: leaf_udf(test.user, Utf8("value")) > Int32(150)
1667            TableScan: test projection=[user]
1668
1669        ## After Extraction
1670        Projection: test.user, leaf_udf(test.user, Utf8("label"))
1671          Projection: test.user
1672            Filter: __datafusion_extracted_1 > Int32(150)
1673              Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1674                TableScan: test projection=[user]
1675
1676        ## After Pushdown
1677        Projection: test.user, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("label"))
1678          Filter: __datafusion_extracted_1 > Int32(150)
1679            Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("label")) AS __datafusion_extracted_2
1680              TableScan: test projection=[user]
1681
1682        ## Optimized
1683        (same as after pushdown)
1684        "#)
1685    }
1686
1687    #[test]
1688    fn test_projection_deduplication() -> Result<()> {
1689        let table_scan = test_table_scan_with_struct()?;
1690        let field = leaf_udf(col("user"), "name");
1691        let plan = LogicalPlanBuilder::from(table_scan)
1692            .project(vec![field.clone(), field.clone().alias("name2")])?
1693            .build()?;
1694
1695        assert_stages!(plan, @r#"
1696        ## Original Plan
1697        Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1698          TableScan: test projection=[user]
1699
1700        ## After Extraction
1701        (same as original)
1702
1703        ## After Pushdown
1704        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_1 AS name2
1705          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1706            TableScan: test projection=[user]
1707
1708        ## Optimized
1709        Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1710          TableScan: test projection=[user]
1711        "#)
1712    }
1713
1714    // =========================================================================
1715    // Additional tests for code coverage
1716    // =========================================================================
1717
1718    /// Extractions push through Sort nodes to reach the TableScan.
1719    #[test]
1720    fn test_extract_through_sort() -> Result<()> {
1721        let table_scan = test_table_scan_with_struct()?;
1722        let plan = LogicalPlanBuilder::from(table_scan)
1723            .sort(vec![col("user").sort(true, true)])?
1724            .project(vec![leaf_udf(col("user"), "name")])?
1725            .build()?;
1726
1727        assert_stages!(plan, @r#"
1728        ## Original Plan
1729        Projection: leaf_udf(test.user, Utf8("name"))
1730          Sort: test.user ASC NULLS FIRST
1731            TableScan: test projection=[user]
1732
1733        ## After Extraction
1734        (same as original)
1735
1736        ## After Pushdown
1737        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1738          Sort: test.user ASC NULLS FIRST
1739            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1740              TableScan: test projection=[user]
1741
1742        ## Optimized
1743        (same as after pushdown)
1744        "#)
1745    }
1746
1747    /// Extractions push through Limit nodes to reach the TableScan.
1748    #[test]
1749    fn test_extract_through_limit() -> Result<()> {
1750        let table_scan = test_table_scan_with_struct()?;
1751        let plan = LogicalPlanBuilder::from(table_scan)
1752            .limit(0, Some(10))?
1753            .project(vec![leaf_udf(col("user"), "name")])?
1754            .build()?;
1755
1756        assert_stages!(plan, @r#"
1757        ## Original Plan
1758        Projection: leaf_udf(test.user, Utf8("name"))
1759          Limit: skip=0, fetch=10
1760            TableScan: test projection=[user]
1761
1762        ## After Extraction
1763        (same as original)
1764
1765        ## After Pushdown
1766        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1767          Limit: skip=0, fetch=10
1768            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1769              TableScan: test projection=[user]
1770
1771        ## Optimized
1772        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1773          Limit: skip=0, fetch=10
1774            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1775              TableScan: test projection=[user]
1776        "#)
1777    }
1778
1779    /// Aliased aggregate functions like count(...).alias("cnt") are handled.
1780    #[test]
1781    fn test_extract_from_aliased_aggregate() -> Result<()> {
1782        use datafusion_expr::test::function_stub::count;
1783
1784        let table_scan = test_table_scan_with_struct()?;
1785        let plan = LogicalPlanBuilder::from(table_scan)
1786            .aggregate(
1787                vec![col("user")],
1788                vec![count(leaf_udf(col("user"), "value")).alias("cnt")],
1789            )?
1790            .build()?;
1791
1792        assert_stages!(plan, @r#"
1793        ## Original Plan
1794        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value"))) AS cnt]]
1795          TableScan: test projection=[user]
1796
1797        ## After Extraction
1798        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1) AS cnt]]
1799          Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1800            TableScan: test projection=[user]
1801
1802        ## After Pushdown
1803        (same as after extraction)
1804
1805        ## Optimized
1806        (same as after pushdown)
1807        "#)
1808    }
1809
1810    /// Aggregates with no MoveTowardsLeafNodes expressions return unchanged.
1811    #[test]
1812    fn test_aggregate_no_extraction() -> Result<()> {
1813        use datafusion_expr::test::function_stub::count;
1814
1815        let table_scan = test_table_scan()?;
1816        let plan = LogicalPlanBuilder::from(table_scan)
1817            .aggregate(vec![col("a")], vec![count(col("b"))])?
1818            .build()?;
1819
1820        assert_stages!(plan, @"
1821        ## Original Plan
1822        Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b)]]
1823          TableScan: test projection=[a, b]
1824
1825        ## After Extraction
1826        (same as original)
1827
1828        ## After Pushdown
1829        (same as after extraction)
1830
1831        ## Optimized
1832        (same as after pushdown)
1833        ")
1834    }
1835
1836    /// Projections containing extracted expression aliases are skipped (already extracted).
1837    #[test]
1838    fn test_skip_extracted_projection() -> Result<()> {
1839        let table_scan = test_table_scan_with_struct()?;
1840        let plan = LogicalPlanBuilder::from(table_scan)
1841            .project(vec![
1842                leaf_udf(col("user"), "name").alias("__datafusion_extracted_manual"),
1843                col("user"),
1844            ])?
1845            .build()?;
1846
1847        assert_stages!(plan, @r#"
1848        ## Original Plan
1849        Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_manual, test.user
1850          TableScan: test projection=[user]
1851
1852        ## After Extraction
1853        (same as original)
1854
1855        ## After Pushdown
1856        (same as after extraction)
1857
1858        ## Optimized
1859        (same as after pushdown)
1860        "#)
1861    }
1862
1863    /// Multiple extractions merge into a single extracted expression projection.
1864    #[test]
1865    fn test_merge_into_existing_extracted_projection() -> Result<()> {
1866        let table_scan = test_table_scan_with_struct()?;
1867        let plan = LogicalPlanBuilder::from(table_scan)
1868            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1869            .filter(leaf_udf(col("user"), "name").is_not_null())?
1870            .build()?;
1871
1872        assert_stages!(plan, @r#"
1873        ## Original Plan
1874        Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL
1875          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1876            TableScan: test projection=[id, user]
1877
1878        ## After Extraction
1879        Projection: test.id, test.user
1880          Filter: __datafusion_extracted_1 IS NOT NULL
1881            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1882              Projection: test.id, test.user
1883                Filter: __datafusion_extracted_2 = Utf8("active")
1884                  Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user
1885                    TableScan: test projection=[id, user]
1886
1887        ## After Pushdown
1888        Projection: test.id, test.user
1889          Filter: __datafusion_extracted_1 IS NOT NULL
1890            Filter: __datafusion_extracted_2 = Utf8("active")
1891              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1892                TableScan: test projection=[id, user]
1893
1894        ## Optimized
1895        Projection: test.id, test.user
1896          Filter: __datafusion_extracted_1 IS NOT NULL
1897            Projection: test.id, test.user, __datafusion_extracted_1
1898              Filter: __datafusion_extracted_2 = Utf8("active")
1899                Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1900                  TableScan: test projection=[id, user]
1901        "#)
1902    }
1903
1904    /// Extractions push through passthrough projections (columns only).
1905    #[test]
1906    fn test_extract_through_passthrough_projection() -> Result<()> {
1907        let table_scan = test_table_scan_with_struct()?;
1908        let plan = LogicalPlanBuilder::from(table_scan)
1909            .project(vec![col("user")])?
1910            .project(vec![leaf_udf(col("user"), "name")])?
1911            .build()?;
1912
1913        assert_stages!(plan, @r#"
1914        ## Original Plan
1915        Projection: leaf_udf(test.user, Utf8("name"))
1916          TableScan: test projection=[user]
1917
1918        ## After Extraction
1919        (same as original)
1920
1921        ## After Pushdown
1922        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1923          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1924            TableScan: test projection=[user]
1925
1926        ## Optimized
1927        Projection: leaf_udf(test.user, Utf8("name"))
1928          TableScan: test projection=[user]
1929        "#)
1930    }
1931
1932    /// Projections with aliased columns (nothing to extract) return unchanged.
1933    #[test]
1934    fn test_projection_early_return_no_extraction() -> Result<()> {
1935        let table_scan = test_table_scan()?;
1936        let plan = LogicalPlanBuilder::from(table_scan)
1937            .project(vec![col("a").alias("x"), col("b")])?
1938            .build()?;
1939
1940        assert_stages!(plan, @"
1941        ## Original Plan
1942        Projection: test.a AS x, test.b
1943          TableScan: test projection=[a, b]
1944
1945        ## After Extraction
1946        (same as original)
1947
1948        ## After Pushdown
1949        (same as after extraction)
1950
1951        ## Optimized
1952        (same as after pushdown)
1953        ")
1954    }
1955
1956    /// Projections with arithmetic expressions but no MoveTowardsLeafNodes return unchanged.
1957    #[test]
1958    fn test_projection_with_arithmetic_no_extraction() -> Result<()> {
1959        let table_scan = test_table_scan()?;
1960        let plan = LogicalPlanBuilder::from(table_scan)
1961            .project(vec![(col("a") + col("b")).alias("sum")])?
1962            .build()?;
1963
1964        assert_stages!(plan, @"
1965        ## Original Plan
1966        Projection: test.a + test.b AS sum
1967          TableScan: test projection=[a, b]
1968
1969        ## After Extraction
1970        (same as original)
1971
1972        ## After Pushdown
1973        (same as after extraction)
1974
1975        ## Optimized
1976        (same as after pushdown)
1977        ")
1978    }
1979
1980    /// Aggregate extractions merge into existing extracted projection created by Filter.
1981    #[test]
1982    fn test_aggregate_merge_into_extracted_projection() -> Result<()> {
1983        use datafusion_expr::test::function_stub::count;
1984
1985        let table_scan = test_table_scan_with_struct()?;
1986        let plan = LogicalPlanBuilder::from(table_scan)
1987            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1988            .aggregate(vec![leaf_udf(col("user"), "name")], vec![count(lit(1))])?
1989            .build()?;
1990
1991        assert_stages!(plan, @r#"
1992        ## Original Plan
1993        Aggregate: groupBy=[[leaf_udf(test.user, Utf8("name"))]], aggr=[[COUNT(Int32(1))]]
1994          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1995            TableScan: test projection=[user]
1996
1997        ## After Extraction
1998        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
1999          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2000            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2001              Projection: test.user
2002                Filter: __datafusion_extracted_2 = Utf8("active")
2003                  Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user
2004                    TableScan: test projection=[user]
2005
2006        ## After Pushdown
2007        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2008          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2009            Filter: __datafusion_extracted_2 = Utf8("active")
2010              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2011                TableScan: test projection=[user]
2012
2013        ## Optimized
2014        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2015          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2016            Projection: __datafusion_extracted_1
2017              Filter: __datafusion_extracted_2 = Utf8("active")
2018                Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2019                  TableScan: test projection=[user]
2020        "#)
2021    }
2022
2023    /// Projection containing a MoveTowardsLeafNodes sub-expression above an
2024    /// Aggregate. Aggregate blocks pushdown, so the (None, true) recovery
2025    /// fallback path fires: in-place extraction + recovery projection.
2026    #[test]
2027    fn test_projection_with_leaf_expr_above_aggregate() -> Result<()> {
2028        use datafusion_expr::test::function_stub::count;
2029
2030        let table_scan = test_table_scan_with_struct()?;
2031        let plan = LogicalPlanBuilder::from(table_scan)
2032            .aggregate(vec![col("user")], vec![count(lit(1))])?
2033            .project(vec![
2034                leaf_udf(col("user"), "name")
2035                    .is_not_null()
2036                    .alias("has_name"),
2037                col("COUNT(Int32(1))"),
2038            ])?
2039            .build()?;
2040
2041        assert_stages!(plan, @r#"
2042        ## Original Plan
2043        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2044          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2045            TableScan: test projection=[user]
2046
2047        ## After Extraction
2048        (same as original)
2049
2050        ## After Pushdown
2051        Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, COUNT(Int32(1))
2052          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user, COUNT(Int32(1))
2053            Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2054              TableScan: test projection=[user]
2055
2056        ## Optimized
2057        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2058          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2059            TableScan: test projection=[user]
2060        "#)
2061    }
2062
2063    /// Merging adds new pass-through columns not in the existing extracted projection.
2064    #[test]
2065    fn test_merge_with_new_columns() -> Result<()> {
2066        let table_scan = test_table_scan()?;
2067        let plan = LogicalPlanBuilder::from(table_scan)
2068            .filter(leaf_udf(col("a"), "x").eq(lit(1)))?
2069            .filter(leaf_udf(col("b"), "y").eq(lit(2)))?
2070            .build()?;
2071
2072        assert_stages!(plan, @r#"
2073        ## Original Plan
2074        Filter: leaf_udf(test.b, Utf8("y")) = Int32(2)
2075          Filter: leaf_udf(test.a, Utf8("x")) = Int32(1)
2076            TableScan: test projection=[a, b, c]
2077
2078        ## After Extraction
2079        Projection: test.a, test.b, test.c
2080          Filter: __datafusion_extracted_1 = Int32(2)
2081            Projection: leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1, test.a, test.b, test.c
2082              Projection: test.a, test.b, test.c
2083                Filter: __datafusion_extracted_2 = Int32(1)
2084                  Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c
2085                    TableScan: test projection=[a, b, c]
2086
2087        ## After Pushdown
2088        Projection: test.a, test.b, test.c
2089          Filter: __datafusion_extracted_1 = Int32(2)
2090            Filter: __datafusion_extracted_2 = Int32(1)
2091              Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2092                TableScan: test projection=[a, b, c]
2093
2094        ## Optimized
2095        Projection: test.a, test.b, test.c
2096          Filter: __datafusion_extracted_1 = Int32(2)
2097            Projection: test.a, test.b, test.c, __datafusion_extracted_1
2098              Filter: __datafusion_extracted_2 = Int32(1)
2099                Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2100                  TableScan: test projection=[a, b, c]
2101        "#)
2102    }
2103
2104    // =========================================================================
2105    // Join extraction tests
2106    // =========================================================================
2107
2108    /// Create a second table scan with struct field for join tests
2109    fn test_table_scan_with_struct_named(name: &str) -> Result<LogicalPlan> {
2110        use arrow::datatypes::Schema;
2111        let schema = Schema::new(test_table_scan_with_struct_fields());
2112        datafusion_expr::logical_plan::table_scan(Some(name), &schema, None)?.build()
2113    }
2114
2115    /// Extraction from equijoin keys (`on` expressions).
2116    #[test]
2117    fn test_extract_from_join_on() -> Result<()> {
2118        use datafusion_expr::JoinType;
2119
2120        let left = test_table_scan_with_struct()?;
2121        let right = test_table_scan_with_struct_named("right")?;
2122
2123        let plan = LogicalPlanBuilder::from(left)
2124            .join_with_expr_keys(
2125                right,
2126                JoinType::Inner,
2127                (
2128                    vec![leaf_udf(col("user"), "id")],
2129                    vec![leaf_udf(col("user"), "id")],
2130                ),
2131                None,
2132            )?
2133            .build()?;
2134
2135        assert_stages!(plan, @r#"
2136        ## Original Plan
2137        Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2138          TableScan: test projection=[id, user]
2139          TableScan: right projection=[id, user]
2140
2141        ## After Extraction
2142        Projection: test.id, test.user, right.id, right.user
2143          Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2
2144            Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_1, test.id, test.user
2145              TableScan: test projection=[id, user]
2146            Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_2, right.id, right.user
2147              TableScan: right projection=[id, user]
2148
2149        ## After Pushdown
2150        (same as after extraction)
2151
2152        ## Optimized
2153        (same as after pushdown)
2154        "#)
2155    }
2156
2157    /// Extraction from non-equi join filter.
2158    #[test]
2159    fn test_extract_from_join_filter() -> Result<()> {
2160        use datafusion_expr::JoinType;
2161
2162        let left = test_table_scan_with_struct()?;
2163        let right = test_table_scan_with_struct_named("right")?;
2164
2165        let plan = LogicalPlanBuilder::from(left)
2166            .join_on(
2167                right,
2168                JoinType::Inner,
2169                vec![
2170                    col("test.user").eq(col("right.user")),
2171                    leaf_udf(col("test.user"), "status").eq(lit("active")),
2172                ],
2173            )?
2174            .build()?;
2175
2176        assert_stages!(plan, @r#"
2177        ## Original Plan
2178        Inner Join:  Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active")
2179          TableScan: test projection=[id, user]
2180          TableScan: right projection=[id, user]
2181
2182        ## After Extraction
2183        Projection: test.id, test.user, right.id, right.user
2184          Inner Join:  Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active")
2185            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2186              TableScan: test projection=[id, user]
2187            TableScan: right projection=[id, user]
2188
2189        ## After Pushdown
2190        (same as after extraction)
2191
2192        ## Optimized
2193        (same as after pushdown)
2194        "#)
2195    }
2196
2197    /// Extraction from both left and right sides of a join.
2198    #[test]
2199    fn test_extract_from_join_both_sides() -> Result<()> {
2200        use datafusion_expr::JoinType;
2201
2202        let left = test_table_scan_with_struct()?;
2203        let right = test_table_scan_with_struct_named("right")?;
2204
2205        let plan = LogicalPlanBuilder::from(left)
2206            .join_on(
2207                right,
2208                JoinType::Inner,
2209                vec![
2210                    col("test.user").eq(col("right.user")),
2211                    leaf_udf(col("test.user"), "status").eq(lit("active")),
2212                    leaf_udf(col("right.user"), "role").eq(lit("admin")),
2213                ],
2214            )?
2215            .build()?;
2216
2217        assert_stages!(plan, @r#"
2218        ## Original Plan
2219        Inner Join:  Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active") AND leaf_udf(right.user, Utf8("role")) = Utf8("admin")
2220          TableScan: test projection=[id, user]
2221          TableScan: right projection=[id, user]
2222
2223        ## After Extraction
2224        Projection: test.id, test.user, right.id, right.user
2225          Inner Join:  Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active") AND __datafusion_extracted_2 = Utf8("admin")
2226            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2227              TableScan: test projection=[id, user]
2228            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2229              TableScan: right projection=[id, user]
2230
2231        ## After Pushdown
2232        (same as after extraction)
2233
2234        ## Optimized
2235        (same as after pushdown)
2236        "#)
2237    }
2238
2239    /// Join with no MoveTowardsLeafNodes expressions returns unchanged.
2240    #[test]
2241    fn test_extract_from_join_no_extraction() -> Result<()> {
2242        use datafusion_expr::JoinType;
2243
2244        let left = test_table_scan()?;
2245        let right = test_table_scan_with_name("right")?;
2246
2247        let plan = LogicalPlanBuilder::from(left)
2248            .join(right, JoinType::Inner, (vec!["a"], vec!["a"]), None)?
2249            .build()?;
2250
2251        assert_stages!(plan, @"
2252        ## Original Plan
2253        Inner Join: test.a = right.a
2254          TableScan: test projection=[a, b, c]
2255          TableScan: right projection=[a, b, c]
2256
2257        ## After Extraction
2258        (same as original)
2259
2260        ## After Pushdown
2261        (same as after extraction)
2262
2263        ## Optimized
2264        (same as after pushdown)
2265        ")
2266    }
2267
2268    /// Join followed by filter with extraction.
2269    #[test]
2270    fn test_extract_from_filter_above_join() -> Result<()> {
2271        use datafusion_expr::JoinType;
2272
2273        let left = test_table_scan_with_struct()?;
2274        let right = test_table_scan_with_struct_named("right")?;
2275
2276        let plan = LogicalPlanBuilder::from(left)
2277            .join_with_expr_keys(
2278                right,
2279                JoinType::Inner,
2280                (
2281                    vec![leaf_udf(col("user"), "id")],
2282                    vec![leaf_udf(col("user"), "id")],
2283                ),
2284                None,
2285            )?
2286            .filter(leaf_udf(col("test.user"), "status").eq(lit("active")))?
2287            .build()?;
2288
2289        assert_stages!(plan, @r#"
2290        ## Original Plan
2291        Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2292          Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2293            TableScan: test projection=[id, user]
2294            TableScan: right projection=[id, user]
2295
2296        ## After Extraction
2297        Projection: test.id, test.user, right.id, right.user
2298          Filter: __datafusion_extracted_1 = Utf8("active")
2299            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, right.id, right.user
2300              Projection: test.id, test.user, right.id, right.user
2301                Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2302                  Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user
2303                    TableScan: test projection=[id, user]
2304                  Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2305                    TableScan: right projection=[id, user]
2306
2307        ## After Pushdown
2308        Projection: test.id, test.user, right.id, right.user
2309          Filter: __datafusion_extracted_1 = Utf8("active")
2310            Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2311              Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2312                TableScan: test projection=[id, user]
2313              Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2314                TableScan: right projection=[id, user]
2315
2316        ## Optimized
2317        Projection: test.id, test.user, right.id, right.user
2318          Filter: __datafusion_extracted_1 = Utf8("active")
2319            Projection: test.id, test.user, __datafusion_extracted_1, right.id, right.user
2320              Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2321                Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2322                  TableScan: test projection=[id, user]
2323                Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2324                  TableScan: right projection=[id, user]
2325        "#)
2326    }
2327
2328    /// Extraction projection (get_field in SELECT) above a Join pushes into
2329    /// the correct input side.
2330    #[test]
2331    fn test_extract_projection_above_join() -> Result<()> {
2332        use datafusion_expr::JoinType;
2333
2334        let left = test_table_scan_with_struct()?;
2335        let right = test_table_scan_with_struct_named("right")?;
2336
2337        let plan = LogicalPlanBuilder::from(left)
2338            .join(right, JoinType::Inner, (vec!["id"], vec!["id"]), None)?
2339            .project(vec![
2340                leaf_udf(col("test.user"), "status"),
2341                leaf_udf(col("right.user"), "role"),
2342            ])?
2343            .build()?;
2344
2345        assert_stages!(plan, @r#"
2346        ## Original Plan
2347        Projection: leaf_udf(test.user, Utf8("status")), leaf_udf(right.user, Utf8("role"))
2348          Inner Join: test.id = right.id
2349            TableScan: test projection=[id, user]
2350            TableScan: right projection=[id, user]
2351
2352        ## After Extraction
2353        (same as original)
2354
2355        ## After Pushdown
2356        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2357          Inner Join: test.id = right.id
2358            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2359              TableScan: test projection=[id, user]
2360            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2361              TableScan: right projection=[id, user]
2362
2363        ## Optimized
2364        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2365          Inner Join: test.id = right.id
2366            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
2367              TableScan: test projection=[id, user]
2368            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id
2369              TableScan: right projection=[id, user]
2370        "#)
2371    }
2372
2373    /// Join where both sides have same-named columns: a qualified reference
2374    /// to the right side must be routed to the right input, not the left.
2375    #[test]
2376    fn test_extract_from_join_qualified_right_side() -> Result<()> {
2377        use datafusion_expr::JoinType;
2378
2379        let left = test_table_scan_with_struct()?;
2380        let right = test_table_scan_with_struct_named("right")?;
2381
2382        // Filter references right.user explicitly — must route to right side
2383        let plan = LogicalPlanBuilder::from(left)
2384            .join_on(
2385                right,
2386                JoinType::Inner,
2387                vec![
2388                    col("test.id").eq(col("right.id")),
2389                    leaf_udf(col("right.user"), "status").eq(lit("active")),
2390                ],
2391            )?
2392            .build()?;
2393
2394        assert_stages!(plan, @r#"
2395        ## Original Plan
2396        Inner Join:  Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) = Utf8("active")
2397          TableScan: test projection=[id, user]
2398          TableScan: right projection=[id, user]
2399
2400        ## After Extraction
2401        Projection: test.id, test.user, right.id, right.user
2402          Inner Join:  Filter: test.id = right.id AND __datafusion_extracted_1 = Utf8("active")
2403            TableScan: test projection=[id, user]
2404            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2405              TableScan: right projection=[id, user]
2406
2407        ## After Pushdown
2408        (same as after extraction)
2409
2410        ## Optimized
2411        (same as after pushdown)
2412        "#)
2413    }
2414
2415    /// When both inputs contain the same unqualified column, an unqualified
2416    /// column reference is ambiguous and `find_owning_input` must return
2417    /// `None` rather than always returning 0 (the left side).
2418    #[test]
2419    fn test_find_owning_input_ambiguous_unqualified_column() {
2420        use std::collections::HashSet;
2421
2422        // Simulate schema_columns output for two sides of a join where both
2423        // have a "user" column — each set contains the qualified and
2424        // unqualified form.
2425        let relation = "test".into();
2426        let left_cols: HashSet<ColumnReference> = [
2427            ColumnReference::new(Some(&relation), "user"),
2428            ColumnReference::new_unqualified("user"),
2429        ]
2430        .into_iter()
2431        .collect();
2432
2433        let relation = "right".into();
2434        let right_cols: HashSet<ColumnReference> = [
2435            ColumnReference::new(Some(&relation), "user"),
2436            ColumnReference::new_unqualified("user"),
2437        ]
2438        .into_iter()
2439        .collect();
2440
2441        let input_column_sets = vec![left_cols, right_cols];
2442
2443        // Unqualified "user" matches both sets — must return None (ambiguous)
2444        let unqualified = Expr::Column(Column::new_unqualified("user"));
2445        assert_eq!(find_owning_input(&unqualified, &input_column_sets), None);
2446
2447        // Qualified "right.user" matches only the right set — must return Some(1)
2448        let qualified_right = Expr::Column(Column::new(Some("right"), "user"));
2449        assert_eq!(
2450            find_owning_input(&qualified_right, &input_column_sets),
2451            Some(1)
2452        );
2453
2454        // Qualified "test.user" matches only the left set — must return Some(0)
2455        let qualified_left = Expr::Column(Column::new(Some("test"), "user"));
2456        assert_eq!(
2457            find_owning_input(&qualified_left, &input_column_sets),
2458            Some(0)
2459        );
2460    }
2461
2462    /// Two leaf_udf expressions from different sides of a Join in a Filter.
2463    /// Each is routed to its respective input side independently.
2464    #[test]
2465    fn test_extract_from_join_cross_input_expression() -> Result<()> {
2466        let left = test_table_scan_with_struct()?;
2467        let right = test_table_scan_with_struct_named("right")?;
2468
2469        let plan = LogicalPlanBuilder::from(left)
2470            .join_on(
2471                right,
2472                datafusion_expr::JoinType::Inner,
2473                vec![col("test.id").eq(col("right.id"))],
2474            )?
2475            .filter(
2476                leaf_udf(col("test.user"), "status")
2477                    .eq(leaf_udf(col("right.user"), "status")),
2478            )?
2479            .build()?;
2480
2481        assert_stages!(plan, @r#"
2482        ## Original Plan
2483        Filter: leaf_udf(test.user, Utf8("status")) = leaf_udf(right.user, Utf8("status"))
2484          Inner Join:  Filter: test.id = right.id
2485            TableScan: test projection=[id, user]
2486            TableScan: right projection=[id, user]
2487
2488        ## After Extraction
2489        Projection: test.id, test.user, right.id, right.user
2490          Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2491            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, right.id, right.user
2492              Inner Join:  Filter: test.id = right.id
2493                TableScan: test projection=[id, user]
2494                TableScan: right projection=[id, user]
2495
2496        ## After Pushdown
2497        Projection: test.id, test.user, right.id, right.user
2498          Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2499            Inner Join:  Filter: test.id = right.id
2500              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2501                TableScan: test projection=[id, user]
2502              Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, right.id, right.user
2503                TableScan: right projection=[id, user]
2504
2505        ## Optimized
2506        (same as after pushdown)
2507        "#)
2508    }
2509
2510    // =========================================================================
2511    // Column-rename through intermediate node tests
2512    // =========================================================================
2513
2514    /// Projection with leaf expr above Filter above renaming Projection.
2515    #[test]
2516    fn test_extract_through_filter_with_column_rename() -> Result<()> {
2517        let table_scan = test_table_scan_with_struct()?;
2518        let plan = LogicalPlanBuilder::from(table_scan)
2519            .project(vec![col("user").alias("x")])?
2520            .filter(col("x").is_not_null())?
2521            .project(vec![leaf_udf(col("x"), "a")])?
2522            .build()?;
2523
2524        assert_stages!(plan, @r#"
2525        ## Original Plan
2526        Projection: leaf_udf(x, Utf8("a"))
2527          Filter: x IS NOT NULL
2528            Projection: test.user AS x
2529              TableScan: test projection=[user]
2530
2531        ## After Extraction
2532        (same as original)
2533
2534        ## After Pushdown
2535        Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2536          Filter: x IS NOT NULL
2537            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2538              TableScan: test projection=[user]
2539
2540        ## Optimized
2541        Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2542          Filter: x IS NOT NULL
2543            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2544              TableScan: test projection=[user]
2545        "#)
2546    }
2547
2548    /// Same as above but with a partial extraction (leaf + arithmetic).
2549    #[test]
2550    fn test_extract_partial_through_filter_with_column_rename() -> Result<()> {
2551        let table_scan = test_table_scan_with_struct()?;
2552        let plan = LogicalPlanBuilder::from(table_scan)
2553            .project(vec![col("user").alias("x")])?
2554            .filter(col("x").is_not_null())?
2555            .project(vec![leaf_udf(col("x"), "a").is_not_null()])?
2556            .build()?;
2557
2558        assert_stages!(plan, @r#"
2559        ## Original Plan
2560        Projection: leaf_udf(x, Utf8("a")) IS NOT NULL
2561          Filter: x IS NOT NULL
2562            Projection: test.user AS x
2563              TableScan: test projection=[user]
2564
2565        ## After Extraction
2566        (same as original)
2567
2568        ## After Pushdown
2569        Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2570          Filter: x IS NOT NULL
2571            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2572              TableScan: test projection=[user]
2573
2574        ## Optimized
2575        Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2576          Filter: x IS NOT NULL
2577            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2578              TableScan: test projection=[user]
2579        "#)
2580    }
2581
2582    /// Tests merge_into_extracted_projection path through a renaming projection.
2583    #[test]
2584    fn test_extract_from_filter_above_renaming_projection() -> Result<()> {
2585        let table_scan = test_table_scan_with_struct()?;
2586        let plan = LogicalPlanBuilder::from(table_scan)
2587            .project(vec![col("user").alias("x")])?
2588            .filter(leaf_udf(col("x"), "a").eq(lit("active")))?
2589            .build()?;
2590
2591        assert_stages!(plan, @r#"
2592        ## Original Plan
2593        Filter: leaf_udf(x, Utf8("a")) = Utf8("active")
2594          Projection: test.user AS x
2595            TableScan: test projection=[user]
2596
2597        ## After Extraction
2598        Projection: x
2599          Filter: __datafusion_extracted_1 = Utf8("active")
2600            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2601              TableScan: test projection=[user]
2602
2603        ## After Pushdown
2604        (same as after extraction)
2605
2606        ## Optimized
2607        Projection: x
2608          Filter: __datafusion_extracted_1 = Utf8("active")
2609            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2610              TableScan: test projection=[user]
2611        "#)
2612    }
2613
2614    // =========================================================================
2615    // SubqueryAlias extraction tests
2616    // =========================================================================
2617
2618    /// Extraction projection pushes through SubqueryAlias.
2619    #[test]
2620    fn test_extract_through_subquery_alias() -> Result<()> {
2621        let table_scan = test_table_scan_with_struct()?;
2622        let plan = LogicalPlanBuilder::from(table_scan)
2623            .alias("sub")?
2624            .project(vec![leaf_udf(col("sub.user"), "name")])?
2625            .build()?;
2626
2627        assert_stages!(plan, @r#"
2628        ## Original Plan
2629        Projection: leaf_udf(sub.user, Utf8("name"))
2630          SubqueryAlias: sub
2631            TableScan: test projection=[user]
2632
2633        ## After Extraction
2634        (same as original)
2635
2636        ## After Pushdown
2637        Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2638          SubqueryAlias: sub
2639            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2640              TableScan: test projection=[user]
2641
2642        ## Optimized
2643        Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2644          SubqueryAlias: sub
2645            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2646              TableScan: test projection=[user]
2647        "#)
2648    }
2649
2650    /// Extraction projection pushes through SubqueryAlias + Filter.
2651    #[test]
2652    fn test_extract_through_subquery_alias_with_filter() -> Result<()> {
2653        let table_scan = test_table_scan_with_struct()?;
2654        let plan = LogicalPlanBuilder::from(table_scan)
2655            .alias("sub")?
2656            .filter(leaf_udf(col("sub.user"), "status").eq(lit("active")))?
2657            .project(vec![leaf_udf(col("sub.user"), "name")])?
2658            .build()?;
2659
2660        assert_stages!(plan, @r#"
2661        ## Original Plan
2662        Projection: leaf_udf(sub.user, Utf8("name"))
2663          Filter: leaf_udf(sub.user, Utf8("status")) = Utf8("active")
2664            SubqueryAlias: sub
2665              TableScan: test projection=[user]
2666
2667        ## After Extraction
2668        Projection: leaf_udf(sub.user, Utf8("name"))
2669          Projection: sub.user
2670            Filter: __datafusion_extracted_1 = Utf8("active")
2671              Projection: leaf_udf(sub.user, Utf8("status")) AS __datafusion_extracted_1, sub.user
2672                SubqueryAlias: sub
2673                  TableScan: test projection=[user]
2674
2675        ## After Pushdown
2676        Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2677          Filter: __datafusion_extracted_1 = Utf8("active")
2678            SubqueryAlias: sub
2679              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.user
2680                TableScan: test projection=[user]
2681
2682        ## Optimized
2683        Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2684          Filter: __datafusion_extracted_1 = Utf8("active")
2685            SubqueryAlias: sub
2686              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2687                TableScan: test projection=[user]
2688        "#)
2689    }
2690
2691    /// Two layers of SubqueryAlias: extraction pushes through both.
2692    #[test]
2693    fn test_extract_through_nested_subquery_alias() -> Result<()> {
2694        let table_scan = test_table_scan_with_struct()?;
2695        let plan = LogicalPlanBuilder::from(table_scan)
2696            .alias("inner_sub")?
2697            .alias("outer_sub")?
2698            .project(vec![leaf_udf(col("outer_sub.user"), "name")])?
2699            .build()?;
2700
2701        assert_stages!(plan, @r#"
2702        ## Original Plan
2703        Projection: leaf_udf(outer_sub.user, Utf8("name"))
2704          SubqueryAlias: outer_sub
2705            SubqueryAlias: inner_sub
2706              TableScan: test projection=[user]
2707
2708        ## After Extraction
2709        (same as original)
2710
2711        ## After Pushdown
2712        Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2713          SubqueryAlias: outer_sub
2714            SubqueryAlias: inner_sub
2715              Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2716                TableScan: test projection=[user]
2717
2718        ## Optimized
2719        Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2720          SubqueryAlias: outer_sub
2721            SubqueryAlias: inner_sub
2722              Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2723                TableScan: test projection=[user]
2724        "#)
2725    }
2726
2727    /// Plain columns through SubqueryAlias -- no extraction needed.
2728    #[test]
2729    fn test_subquery_alias_no_extraction() -> Result<()> {
2730        let table_scan = test_table_scan()?;
2731        let plan = LogicalPlanBuilder::from(table_scan)
2732            .alias("sub")?
2733            .project(vec![col("sub.a"), col("sub.b")])?
2734            .build()?;
2735
2736        assert_stages!(plan, @"
2737        ## Original Plan
2738        SubqueryAlias: sub
2739          TableScan: test projection=[a, b]
2740
2741        ## After Extraction
2742        (same as original)
2743
2744        ## After Pushdown
2745        (same as after extraction)
2746
2747        ## Optimized
2748        (same as after pushdown)
2749        ")
2750    }
2751
2752    /// Two UDFs with the same `name()` but different concrete types should NOT be
2753    /// deduplicated -- they are semantically different expressions that happen to
2754    /// collide on `schema_name()`.
2755    #[test]
2756    fn test_different_udfs_same_schema_name_not_deduplicated() -> Result<()> {
2757        let udf_a = Arc::new(ScalarUDF::new_from_impl(
2758            PlacementTestUDF::new()
2759                .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2760                .with_id(1),
2761        ));
2762        let udf_b = Arc::new(ScalarUDF::new_from_impl(
2763            PlacementTestUDF::new()
2764                .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2765                .with_id(2),
2766        ));
2767
2768        let expr_a = Expr::ScalarFunction(ScalarFunction::new_udf(
2769            udf_a,
2770            vec![col("user"), lit("field")],
2771        ));
2772        let expr_b = Expr::ScalarFunction(ScalarFunction::new_udf(
2773            udf_b,
2774            vec![col("user"), lit("field")],
2775        ));
2776
2777        // Verify preconditions: same schema_name but different Expr
2778        assert_eq!(
2779            expr_a.schema_name().to_string(),
2780            expr_b.schema_name().to_string(),
2781            "Both expressions should have the same schema_name"
2782        );
2783        assert_ne!(
2784            expr_a, expr_b,
2785            "Expressions should NOT be equal (different UDF instances)"
2786        );
2787
2788        let table_scan = test_table_scan_with_struct()?;
2789        let plan = LogicalPlanBuilder::from(table_scan.clone())
2790            .filter(expr_a.clone().eq(lit("a")).and(expr_b.clone().eq(lit("b"))))?
2791            .select(vec![
2792                table_scan
2793                    .schema()
2794                    .index_of_column_by_name(None, "id")
2795                    .unwrap(),
2796            ])?
2797            .build()?;
2798
2799        assert_stages!(plan, @r#"
2800        ## Original Plan
2801        Projection: test.id
2802          Filter: leaf_udf(test.user, Utf8("field")) = Utf8("a") AND leaf_udf(test.user, Utf8("field")) = Utf8("b")
2803            TableScan: test projection=[id, user]
2804
2805        ## After Extraction
2806        Projection: test.id
2807          Projection: test.id, test.user
2808            Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2809              Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id, test.user
2810                TableScan: test projection=[id, user]
2811
2812        ## After Pushdown
2813        (same as after extraction)
2814
2815        ## Optimized
2816        Projection: test.id
2817          Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2818            Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id
2819              TableScan: test projection=[id, user]
2820        "#)
2821    }
2822
2823    // =========================================================================
2824    // Filter pushdown interaction tests
2825    // =========================================================================
2826
2827    /// Extraction pushdown through a filter that already had its own
2828    /// `leaf_udf` extracted.
2829    #[test]
2830    fn test_extraction_pushdown_through_filter_with_extracted_predicate() -> Result<()> {
2831        let table_scan = test_table_scan_with_struct()?;
2832        let plan = LogicalPlanBuilder::from(table_scan)
2833            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2834            .project(vec![col("id"), leaf_udf(col("user"), "name")])?
2835            .build()?;
2836
2837        assert_stages!(plan, @r#"
2838        ## Original Plan
2839        Projection: test.id, leaf_udf(test.user, Utf8("name"))
2840          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2841            TableScan: test projection=[id, user]
2842
2843        ## After Extraction
2844        Projection: test.id, leaf_udf(test.user, Utf8("name"))
2845          Projection: test.id, test.user
2846            Filter: __datafusion_extracted_1 = Utf8("active")
2847              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2848                TableScan: test projection=[id, user]
2849
2850        ## After Pushdown
2851        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2852          Filter: __datafusion_extracted_1 = Utf8("active")
2853            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2854              TableScan: test projection=[id, user]
2855
2856        ## Optimized
2857        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2858          Filter: __datafusion_extracted_1 = Utf8("active")
2859            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2860              TableScan: test projection=[id, user]
2861        "#)
2862    }
2863
2864    /// Same expression in filter predicate and projection output.
2865    #[test]
2866    fn test_extraction_pushdown_same_expr_in_filter_and_projection() -> Result<()> {
2867        let table_scan = test_table_scan_with_struct()?;
2868        let field_expr = leaf_udf(col("user"), "status");
2869        let plan = LogicalPlanBuilder::from(table_scan)
2870            .filter(field_expr.clone().gt(lit(5)))?
2871            .project(vec![col("id"), field_expr])?
2872            .build()?;
2873
2874        assert_stages!(plan, @r#"
2875        ## Original Plan
2876        Projection: test.id, leaf_udf(test.user, Utf8("status"))
2877          Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2878            TableScan: test projection=[id, user]
2879
2880        ## After Extraction
2881        Projection: test.id, leaf_udf(test.user, Utf8("status"))
2882          Projection: test.id, test.user
2883            Filter: __datafusion_extracted_1 > Int32(5)
2884              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2885                TableScan: test projection=[id, user]
2886
2887        ## After Pushdown
2888        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2889          Filter: __datafusion_extracted_1 > Int32(5)
2890            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2891              TableScan: test projection=[id, user]
2892
2893        ## Optimized
2894        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2895          Filter: __datafusion_extracted_1 > Int32(5)
2896            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2897              TableScan: test projection=[id, user]
2898        "#)
2899    }
2900
2901    /// Left join with a `leaf_udf` filter on the right side AND
2902    /// the projection also selects `leaf_udf` from the right side.
2903    #[test]
2904    fn test_left_join_with_filter_and_projection_extraction() -> Result<()> {
2905        use datafusion_expr::JoinType;
2906
2907        let left = test_table_scan_with_struct()?;
2908        let right = test_table_scan_with_struct_named("right")?;
2909
2910        let plan = LogicalPlanBuilder::from(left)
2911            .join_on(
2912                right,
2913                JoinType::Left,
2914                vec![
2915                    col("test.id").eq(col("right.id")),
2916                    leaf_udf(col("right.user"), "status").gt(lit(5)),
2917                ],
2918            )?
2919            .project(vec![
2920                col("test.id"),
2921                leaf_udf(col("test.user"), "name"),
2922                leaf_udf(col("right.user"), "status"),
2923            ])?
2924            .build()?;
2925
2926        assert_stages!(plan, @r#"
2927        ## Original Plan
2928        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2929          Left Join:  Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) > Int32(5)
2930            TableScan: test projection=[id, user]
2931            TableScan: right projection=[id, user]
2932
2933        ## After Extraction
2934        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2935          Projection: test.id, test.user, right.id, right.user
2936            Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2937              TableScan: test projection=[id, user]
2938              Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2939                TableScan: right projection=[id, user]
2940
2941        ## After Pushdown
2942        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2943          Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2944            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id, test.user
2945              TableScan: test projection=[id, user]
2946            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2947              TableScan: right projection=[id, user]
2948
2949        ## Optimized
2950        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2951          Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2952            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id
2953              TableScan: test projection=[id, user]
2954            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2955              TableScan: right projection=[id, user]
2956        "#)
2957    }
2958
2959    /// Extraction projection pushed through a filter whose predicate
2960    /// references a different extracted expression.
2961    #[test]
2962    fn test_pure_extraction_proj_push_through_filter() -> Result<()> {
2963        let table_scan = test_table_scan_with_struct()?;
2964        let plan = LogicalPlanBuilder::from(table_scan)
2965            .filter(leaf_udf(col("user"), "status").gt(lit(5)))?
2966            .project(vec![
2967                col("id"),
2968                leaf_udf(col("user"), "name"),
2969                leaf_udf(col("user"), "status"),
2970            ])?
2971            .build()?;
2972
2973        assert_stages!(plan, @r#"
2974        ## Original Plan
2975        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2976          Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2977            TableScan: test projection=[id, user]
2978
2979        ## After Extraction
2980        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2981          Projection: test.id, test.user
2982            Filter: __datafusion_extracted_1 > Int32(5)
2983              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2984                TableScan: test projection=[id, user]
2985
2986        ## After Pushdown
2987        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
2988          Filter: __datafusion_extracted_1 > Int32(5)
2989            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
2990              TableScan: test projection=[id, user]
2991
2992        ## Optimized
2993        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
2994          Filter: __datafusion_extracted_1 > Int32(5)
2995            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
2996              TableScan: test projection=[id, user]
2997        "#)
2998    }
2999
3000    /// When an extraction projection's __extracted alias references a column
3001    /// (e.g. `user`) that is NOT a standalone expression in the projection,
3002    /// the merge into the inner projection should still succeed.
3003    #[test]
3004    fn test_merge_extraction_into_projection_with_column_ref_inflation() -> Result<()> {
3005        let table_scan = test_table_scan_with_struct()?;
3006
3007        // Inner projection (simulates a trimmed projection)
3008        let inner = LogicalPlanBuilder::from(table_scan)
3009            .project(vec![col("user"), col("id")])?
3010            .build()?;
3011
3012        // Outer projection: __extracted alias + id (but NOT user as standalone).
3013        // The alias references `user` internally, inflating columns_needed.
3014        let plan = LogicalPlanBuilder::from(inner)
3015            .project(vec![
3016                leaf_udf(col("user"), "status")
3017                    .alias(format!("{EXTRACTED_EXPR_PREFIX}_1")),
3018                col("id"),
3019            ])?
3020            .build()?;
3021
3022        // Run only PushDownLeafProjections
3023        let ctx = OptimizerContext::new().with_max_passes(1);
3024        let optimizer =
3025            Optimizer::with_rules(vec![Arc::new(PushDownLeafProjections::new())]);
3026        let result = optimizer.optimize(plan, &ctx, |_, _| {})?;
3027
3028        // With the fix: merge succeeds → extraction merged into inner projection.
3029        // Without the fix: merge rejected → two separate projections remain.
3030        insta::assert_snapshot!(format!("{result}"), @r#"
3031        Projection: __datafusion_extracted_1, test.id
3032          Projection: test.user, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
3033            TableScan: test
3034        "#);
3035
3036        Ok(())
3037    }
3038}