Skip to main content

datafusion_optimizer/
extract_leaf_expressions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Two-pass optimizer pipeline that pushes cheap expressions (like struct field
19//! access `user['status']`) closer to data sources, enabling early data reduction
20//! and source-level optimizations (e.g., Parquet column pruning). See
21//! [`ExtractLeafExpressions`] (pass 1) and [`PushDownLeafProjections`] (pass 2).
22
23use indexmap::{IndexMap, IndexSet};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use datafusion_common::alias::AliasGenerator;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
29use datafusion_common::{Column, DFSchema, Result, qualified_name};
30use datafusion_expr::logical_plan::LogicalPlan;
31use datafusion_expr::{Expr, ExpressionPlacement, Projection};
32
33use crate::optimizer::ApplyOrder;
34use crate::push_down_filter::replace_cols_by_name;
35use crate::utils::has_all_column_refs;
36use crate::{OptimizerConfig, OptimizerRule};
37
38/// Prefix for aliases generated by the extraction optimizer passes.
39///
40/// This prefix is **reserved for internal optimizer use**. User-defined aliases
41/// starting with this prefix may be misidentified as optimizer-generated
42/// extraction aliases, leading to unexpected behavior. Do not use this prefix
43/// in user queries.
44const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
45
46/// Returns `true` if any sub-expression in `exprs` has
47/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement.
48///
49/// This is a lightweight pre-check that short-circuits as soon as one
50/// extractable expression is found, avoiding the expensive allocations
51/// (column HashSets, extractors, expression rewrites) that the full
52/// extraction pipeline requires.
53fn has_extractable_expr(exprs: &[Expr]) -> bool {
54    exprs.iter().any(|expr| {
55        expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
56            .unwrap_or(false)
57    })
58}
59
60/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
61/// into **extraction projections** (pass 1 of 2).
62///
63/// This handles Filter, Sort, Limit, Aggregate, and Join nodes. For Projection
64/// nodes, extraction and pushdown are handled by [`PushDownLeafProjections`].
65///
66/// # Key Concepts
67///
68/// **Extraction projection**: a projection inserted *below* a node that
69/// pre-computes a cheap expression and exposes it under an alias
70/// (`__datafusion_extracted_N`). The parent node then references the alias
71/// instead of the original expression.
72///
73/// **Recovery projection**: a projection inserted *above* a node to restore
74/// the original output schema when extraction changes it.
75/// Schema-preserving nodes (Filter, Sort, Limit) gain extra columns from
76/// the extraction projection that bubble up; the recovery projection selects
77/// only the original columns to hide the extras.
78///
79/// # Example
80///
81/// Given a filter with a struct field access:
82///
83/// ```text
84/// Filter: user['status'] = 'active'
85///   TableScan: t [id, user]
86/// ```
87///
88/// This rule:
89/// 1. Inserts an **extraction projection** below the filter:
90/// 2. Adds a **recovery projection** above to hide the extra column:
91///
92/// ```text
93/// Projection: id, user                                                        <-- recovery projection
94///   Filter: __datafusion_extracted_1 = 'active'
95///     Projection: user['status'] AS __datafusion_extracted_1, id, user         <-- extraction projection
96///       TableScan: t [id, user]
97/// ```
98///
99/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule
100/// and will not push filters through them. It uses `ExpressionPlacement` to detect
101/// `MoveTowardsLeafNodes` expressions and skip filter pushdown past them.
102#[derive(Default, Debug)]
103pub struct ExtractLeafExpressions {}
104
105impl ExtractLeafExpressions {
106    /// Create a new [`ExtractLeafExpressions`]
107    pub fn new() -> Self {
108        Self {}
109    }
110}
111
112impl OptimizerRule for ExtractLeafExpressions {
113    fn name(&self) -> &str {
114        "extract_leaf_expressions"
115    }
116
117    fn rewrite(
118        &self,
119        plan: LogicalPlan,
120        config: &dyn OptimizerConfig,
121    ) -> Result<Transformed<LogicalPlan>> {
122        if !config.options().optimizer.enable_leaf_expression_pushdown {
123            return Ok(Transformed::no(plan));
124        }
125        let alias_generator = config.alias_generator();
126
127        // Advance the alias generator past any user-provided __datafusion_extracted_N
128        // aliases to prevent collisions when generating new extraction aliases.
129        advance_generator_past_existing(&plan, alias_generator)?;
130
131        plan.transform_down_with_subqueries(|plan| {
132            extract_from_plan(plan, alias_generator)
133        })
134    }
135}
136
137/// Scans the current plan node's expressions for pre-existing
138/// `__datafusion_extracted_N` aliases and advances the generator
139/// counter past them to avoid collisions with user-provided aliases.
140fn advance_generator_past_existing(
141    plan: &LogicalPlan,
142    alias_generator: &AliasGenerator,
143) -> Result<()> {
144    plan.apply(|plan| {
145        plan.expressions().iter().try_for_each(|expr| {
146            expr.apply(|e| {
147                if let Expr::Alias(alias) = e
148                    && let Some(id) = alias
149                        .name
150                        .strip_prefix(EXTRACTED_EXPR_PREFIX)
151                        .and_then(|s| s.strip_prefix('_'))
152                        .and_then(|s| s.parse().ok())
153                {
154                    alias_generator.update_min_id(id);
155                }
156                Ok(TreeNodeRecursion::Continue)
157            })?;
158            Ok::<(), datafusion_common::error::DataFusionError>(())
159        })?;
160        Ok(TreeNodeRecursion::Continue)
161    })
162    .map(|_| ())
163}
164
165/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
166///
167/// Works for any number of inputs (0, 1, 2, …N). For multi-input nodes
168/// like Join, each extracted sub-expression is routed to the correct input
169/// by checking which input's schema contains all of the expression's column
170/// references.
171fn extract_from_plan(
172    plan: LogicalPlan,
173    alias_generator: &Arc<AliasGenerator>,
174) -> Result<Transformed<LogicalPlan>> {
175    // Only extract from plan types whose output schema is predictable after
176    // expression rewriting.  Nodes like Window derive column names from
177    // their expressions, so rewriting `get_field` inside a window function
178    // changes the output schema and breaks the recovery projection.
179    if !matches!(
180        &plan,
181        LogicalPlan::Aggregate(_)
182            | LogicalPlan::Filter(_)
183            | LogicalPlan::Sort(_)
184            | LogicalPlan::Limit(_)
185            | LogicalPlan::Join(_)
186    ) {
187        return Ok(Transformed::no(plan));
188    }
189
190    let inputs = plan.inputs();
191    if inputs.is_empty() {
192        return Ok(Transformed::no(plan));
193    }
194
195    // Fast pre-check: skip all allocations if no extractable expressions exist
196    if !has_extractable_expr(&plan.expressions()) {
197        return Ok(Transformed::no(plan));
198    }
199
200    // Save original output schema before any transformation
201    let original_schema = Arc::clone(plan.schema());
202
203    // Build per-input schemas from borrowed inputs (before plan is consumed
204    // by map_expressions). We only need schemas and column sets for routing;
205    // the actual inputs are cloned later only if extraction succeeds.
206    let input_schemas: Vec<Arc<DFSchema>> =
207        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
208
209    // Build per-input extractors
210    let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
211        .iter()
212        .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), alias_generator))
213        .collect();
214
215    // Build per-input column sets for routing expressions to the correct input
216    let input_column_sets: Vec<std::collections::HashSet<Column>> = input_schemas
217        .iter()
218        .map(|schema| schema_columns(schema.as_ref()))
219        .collect();
220
221    // Transform expressions via map_expressions with routing
222    let transformed = plan.map_expressions(|expr| {
223        routing_extract(expr, &mut extractors, &input_column_sets)
224    })?;
225
226    // If no expressions were rewritten, nothing was extracted
227    if !transformed.transformed {
228        return Ok(transformed);
229    }
230
231    // Clone inputs now that we know extraction succeeded. Wrap in Arc
232    // upfront since build_extraction_projection expects &Arc<LogicalPlan>.
233    let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
234        .data
235        .inputs()
236        .into_iter()
237        .map(|i| Arc::new(i.clone()))
238        .collect();
239
240    // Build per-input extraction projections (None means no extractions for that input)
241    let new_inputs: Vec<LogicalPlan> = owned_inputs
242        .into_iter()
243        .zip(extractors.iter())
244        .map(|(input_arc, extractor)| {
245            match extractor.build_extraction_projection(&input_arc)? {
246                Some(plan) => Ok(plan),
247                // No extractions for this input — recover the LogicalPlan
248                // without cloning (refcount is 1 since build returned None).
249                None => {
250                    Ok(Arc::try_unwrap(input_arc).unwrap_or_else(|arc| (*arc).clone()))
251                }
252            }
253        })
254        .collect::<Result<Vec<_>>>()?;
255
256    // Rebuild the plan keeping its rewritten expressions but replacing
257    // inputs with the new extraction projections.
258    let new_plan = transformed
259        .data
260        .with_new_exprs(transformed.data.expressions(), new_inputs)?;
261
262    // Add recovery projection if the output schema changed
263    let recovered = build_recovery_projection(original_schema.as_ref(), new_plan)?;
264
265    Ok(Transformed::yes(recovered))
266}
267
268/// Given an expression, returns the index of the input whose columns fully
269/// cover the expression's column references.
270/// Returns `None` if the expression references columns from multiple inputs
271/// or if multiple inputs match (ambiguous, e.g. unqualified columns present
272/// in both sides of a join).
273fn find_owning_input(
274    expr: &Expr,
275    input_column_sets: &[std::collections::HashSet<Column>],
276) -> Option<usize> {
277    let mut found = None;
278    for (idx, cols) in input_column_sets.iter().enumerate() {
279        if has_all_column_refs(expr, cols) {
280            if found.is_some() {
281                // Ambiguous — multiple inputs match
282                return None;
283            }
284            found = Some(idx);
285        }
286    }
287    found
288}
289
290/// Walks an expression tree top-down, extracting `MoveTowardsLeafNodes`
291/// sub-expressions and routing each to the correct per-input extractor.
292fn routing_extract(
293    expr: Expr,
294    extractors: &mut [LeafExpressionExtractor],
295    input_column_sets: &[std::collections::HashSet<Column>],
296) -> Result<Transformed<Expr>> {
297    expr.transform_down(|e| {
298        // Skip expressions already aliased with extracted expression pattern
299        if let Expr::Alias(alias) = &e
300            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
301        {
302            return Ok(Transformed {
303                data: e,
304                transformed: false,
305                tnr: TreeNodeRecursion::Jump,
306            });
307        }
308
309        // Don't extract Alias nodes directly — preserve the alias and let
310        // transform_down recurse into the inner expression
311        if matches!(&e, Expr::Alias(_)) {
312            return Ok(Transformed::no(e));
313        }
314
315        match e.placement() {
316            ExpressionPlacement::MoveTowardsLeafNodes => {
317                if let Some(idx) = find_owning_input(&e, input_column_sets) {
318                    let col_ref = extractors[idx].add_extracted(e)?;
319                    Ok(Transformed::yes(col_ref))
320                } else {
321                    // References columns from multiple inputs — cannot extract
322                    Ok(Transformed::no(e))
323                }
324            }
325            ExpressionPlacement::Column => {
326                // Track columns that the parent node references so the
327                // extraction projection includes them as pass-through.
328                // Without this, the extraction projection would only
329                // contain __datafusion_extracted_N aliases, and the parent couldn't
330                // resolve its other column references.
331                if let Expr::Column(col) = &e
332                    && let Some(idx) = find_owning_input(&e, input_column_sets)
333                {
334                    extractors[idx].columns_needed.insert(col.clone());
335                }
336                Ok(Transformed::no(e))
337            }
338            _ => Ok(Transformed::no(e)),
339        }
340    })
341}
342
343/// Returns all columns in the schema (both qualified and unqualified forms)
344fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
345    schema
346        .iter()
347        .flat_map(|(qualifier, field)| {
348            [
349                Column::new(qualifier.cloned(), field.name()),
350                Column::new_unqualified(field.name()),
351            ]
352        })
353        .collect()
354}
355
356/// Rewrites extraction pairs and column references from one qualifier
357/// space to another.
358///
359/// Builds a replacement map by zipping `from_schema` (whose qualifiers
360/// currently appear in `pairs` / `columns`) with `to_schema` (the
361/// qualifiers we want), then applies `replace_cols_by_name`.
362///
363/// Used for SubqueryAlias (alias-space -> input-space) and Union
364/// (union output-space -> per-branch input-space).
365fn remap_pairs_and_columns(
366    pairs: &[(Expr, String)],
367    columns: &IndexSet<Column>,
368    from_schema: &DFSchema,
369    to_schema: &DFSchema,
370) -> Result<ExtractionTarget> {
371    let mut replace_map = HashMap::new();
372    for ((from_q, from_f), (to_q, to_f)) in from_schema.iter().zip(to_schema.iter()) {
373        replace_map.insert(
374            qualified_name(from_q, from_f.name()),
375            Expr::Column(Column::new(to_q.cloned(), to_f.name())),
376        );
377    }
378    let remapped_pairs: Vec<(Expr, String)> = pairs
379        .iter()
380        .map(|(expr, alias)| {
381            Ok((
382                replace_cols_by_name(expr.clone(), &replace_map)?,
383                alias.clone(),
384            ))
385        })
386        .collect::<Result<_>>()?;
387    let remapped_columns: IndexSet<Column> = columns
388        .iter()
389        .filter_map(|col| {
390            let rewritten =
391                replace_cols_by_name(Expr::Column(col.clone()), &replace_map).ok()?;
392            if let Expr::Column(c) = rewritten {
393                Some(c)
394            } else {
395                Some(col.clone())
396            }
397        })
398        .collect();
399    Ok(ExtractionTarget {
400        pairs: remapped_pairs,
401        columns: remapped_columns,
402    })
403}
404
405// =============================================================================
406// Helper Types & Functions for Extraction Targeting
407// =============================================================================
408
409/// A bundle of extraction pairs (expression + alias) and standalone columns
410/// that need to be pushed through a plan node.
411struct ExtractionTarget {
412    /// Extracted expressions paired with their generated aliases.
413    pairs: Vec<(Expr, String)>,
414    /// Standalone column references needed by the parent node.
415    columns: IndexSet<Column>,
416}
417
418/// Build a replacement map from a projection: output_column_name -> underlying_expr.
419///
420/// This is used to resolve column references through a renaming projection.
421/// For example, if a projection has `user AS x`, this maps `x` -> `col("user")`.
422fn build_projection_replace_map(projection: &Projection) -> HashMap<String, Expr> {
423    projection
424        .schema
425        .iter()
426        .zip(projection.expr.iter())
427        .map(|((qualifier, field), expr)| {
428            let key = Column::from((qualifier, field)).flat_name();
429            (key, expr.clone().unalias())
430        })
431        .collect()
432}
433
434/// Build a recovery projection to restore the original output schema.
435///
436/// After extraction, a node's output schema may differ from the original:
437///
438/// - **Schema-preserving nodes** (Filter/Sort/Limit): the extraction projection
439///   below adds extra `__datafusion_extracted_N` columns that bubble up through
440///   the node. Recovery selects only the original columns to hide the extras.
441///   ```text
442///   Original schema: [id, user]
443///   After extraction: [__datafusion_extracted_1, id, user]   ← extra column leaked through
444///   Recovery: SELECT id, user FROM ...                       ← hides __datafusion_extracted_1
445///   ```
446///
447/// - **Schema-defining nodes** (Aggregate): same number of columns but names
448///   may differ because extracted aliases replaced the original expressions.
449///   Recovery maps positionally, aliasing where names changed.
450///   ```text
451///   Original: [SUM(user['balance'])]
452///   After:    [SUM(__datafusion_extracted_1)]                ← name changed
453///   Recovery: SUM(__datafusion_extracted_1) AS "SUM(user['balance'])"
454///   ```
455///
456/// - **Schemas identical** → no recovery projection needed.
457fn build_recovery_projection(
458    original_schema: &DFSchema,
459    input: LogicalPlan,
460) -> Result<LogicalPlan> {
461    let new_schema = input.schema();
462    let orig_len = original_schema.fields().len();
463    let new_len = new_schema.fields().len();
464
465    if orig_len == new_len {
466        // Same number of fields — check if schemas are identical
467        let schemas_match = original_schema.iter().zip(new_schema.iter()).all(
468            |((orig_q, orig_f), (new_q, new_f))| {
469                orig_f.name() == new_f.name() && orig_q == new_q
470            },
471        );
472        if schemas_match {
473            return Ok(input);
474        }
475
476        // Schema-defining nodes (Aggregate, Join): names may differ at some
477        // positions because extracted aliases replaced the original expressions.
478        // Map positionally, aliasing where the name changed.
479        //
480        // Invariant: `with_new_exprs` on all supported node types (Aggregate,
481        // Filter, Sort, Limit, Join) preserves column order, so positional
482        // mapping is safe here.
483        debug_assert!(
484            orig_len == new_len,
485            "build_recovery_projection: positional mapping requires same field count, \
486             got original={orig_len} vs new={new_len}"
487        );
488        let mut proj_exprs = Vec::with_capacity(orig_len);
489        for (i, (orig_qualifier, orig_field)) in original_schema.iter().enumerate() {
490            let (new_qualifier, new_field) = new_schema.qualified_field(i);
491            if orig_field.name() == new_field.name() && orig_qualifier == new_qualifier {
492                proj_exprs.push(Expr::from((orig_qualifier, orig_field)));
493            } else {
494                let new_col = Expr::Column(Column::from((new_qualifier, new_field)));
495                proj_exprs.push(
496                    new_col.alias_qualified(orig_qualifier.cloned(), orig_field.name()),
497                );
498            }
499        }
500        let projection = Projection::try_new(proj_exprs, Arc::new(input))?;
501        Ok(LogicalPlan::Projection(projection))
502    } else {
503        // Schema-preserving nodes: new schema has extra extraction columns.
504        // Original columns still exist by name; select them to hide extras.
505        let col_exprs: Vec<Expr> = original_schema.iter().map(Expr::from).collect();
506        let projection = Projection::try_new(col_exprs, Arc::new(input))?;
507        Ok(LogicalPlan::Projection(projection))
508    }
509}
510
511/// Collects `MoveTowardsLeafNodes` sub-expressions found during expression
512/// tree traversal and can build an extraction projection from them.
513///
514/// # Example
515///
516/// Given `Filter: user['status'] = 'active' AND user['name'] IS NOT NULL`:
517/// - `add_extracted(user['status'])` → stores it, returns `col("__datafusion_extracted_1")`
518/// - `add_extracted(user['name'])`   → stores it, returns `col("__datafusion_extracted_2")`
519/// - `build_extraction_projection()` produces:
520///   `Projection: user['status'] AS __datafusion_extracted_1, user['name'] AS __datafusion_extracted_2, <all input columns>`
521struct LeafExpressionExtractor<'a> {
522    /// Extracted expressions: maps expression -> alias
523    extracted: IndexMap<Expr, String>,
524    /// Columns referenced by extracted expressions or the parent node,
525    /// included as pass-through in the extraction projection.
526    columns_needed: IndexSet<Column>,
527    /// Input schema
528    input_schema: &'a DFSchema,
529    /// Alias generator
530    alias_generator: &'a Arc<AliasGenerator>,
531}
532
533impl<'a> LeafExpressionExtractor<'a> {
534    fn new(input_schema: &'a DFSchema, alias_generator: &'a Arc<AliasGenerator>) -> Self {
535        Self {
536            extracted: IndexMap::new(),
537            columns_needed: IndexSet::new(),
538            input_schema,
539            alias_generator,
540        }
541    }
542
543    /// Adds an expression to extracted set, returns column reference.
544    fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
545        // Deduplication: reuse existing alias if same expression
546        if let Some(alias) = self.extracted.get(&expr) {
547            return Ok(Expr::Column(Column::new_unqualified(alias)));
548        }
549
550        // Track columns referenced by this expression
551        for col in expr.column_refs() {
552            self.columns_needed.insert(col.clone());
553        }
554
555        // Generate unique alias
556        let alias = self.alias_generator.next(EXTRACTED_EXPR_PREFIX);
557        self.extracted.insert(expr, alias.clone());
558
559        Ok(Expr::Column(Column::new_unqualified(&alias)))
560    }
561
562    /// Builds an extraction projection above the given input, or merges into
563    /// it if the input is already a projection. Delegates to
564    /// [`build_extraction_projection_impl`].
565    ///
566    /// Returns `None` if there are no extractions.
567    fn build_extraction_projection(
568        &self,
569        input: &Arc<LogicalPlan>,
570    ) -> Result<Option<LogicalPlan>> {
571        if self.extracted.is_empty() {
572            return Ok(None);
573        }
574        let pairs: Vec<(Expr, String)> = self
575            .extracted
576            .iter()
577            .map(|(e, a)| (e.clone(), a.clone()))
578            .collect();
579        let proj = build_extraction_projection_impl(
580            &pairs,
581            &self.columns_needed,
582            input,
583            self.input_schema,
584        )?;
585        Ok(Some(LogicalPlan::Projection(proj)))
586    }
587}
588
589/// Build an extraction projection above the target node (shared by both passes).
590///
591/// If the target is an existing projection, merges into it. This requires
592/// resolving column references through the projection's rename mapping:
593/// if the projection has `user AS u`, and an extracted expression references
594/// `u['name']`, we must rewrite it to `user['name']` since the merged
595/// projection reads from the same input as the original.
596///
597/// Deduplicates by resolved expression equality and adds pass-through
598/// columns as needed. Otherwise builds a fresh projection with extracted
599/// expressions + ALL input schema columns.
600fn build_extraction_projection_impl(
601    extracted_exprs: &[(Expr, String)],
602    columns_needed: &IndexSet<Column>,
603    target: &Arc<LogicalPlan>,
604    target_schema: &DFSchema,
605) -> Result<Projection> {
606    if let LogicalPlan::Projection(existing) = target.as_ref() {
607        // Merge into existing projection
608        let mut proj_exprs = existing.expr.clone();
609
610        // Build a map of existing expressions (by Expr equality) to their aliases
611        let existing_extractions: IndexMap<Expr, String> = existing
612            .expr
613            .iter()
614            .filter_map(|e| {
615                if let Expr::Alias(alias) = e
616                    && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
617                {
618                    return Some((*alias.expr.clone(), alias.name.clone()));
619                }
620                None
621            })
622            .collect();
623
624        // Resolve column references through the projection's rename mapping
625        let replace_map = build_projection_replace_map(existing);
626
627        // Add new extracted expressions, resolving column refs through the projection
628        for (expr, alias) in extracted_exprs {
629            let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?;
630            let resolved_inner = if let Expr::Alias(a) = &resolved {
631                a.expr.as_ref()
632            } else {
633                &resolved
634            };
635            if let Some(existing_alias) = existing_extractions.get(resolved_inner) {
636                // Same expression already extracted under a different alias —
637                // add the expression with the new alias so both names are
638                // available in the output. We can't reference the existing alias
639                // as a column within the same projection, so we duplicate the
640                // computation.
641                if existing_alias != alias {
642                    proj_exprs.push(resolved);
643                }
644            } else {
645                proj_exprs.push(resolved);
646            }
647        }
648
649        // Add any new pass-through columns that aren't already in the projection.
650        // We check against existing.input.schema() (the projection's source) rather
651        // than target_schema (the projection's output) because columns produced
652        // by alias expressions (e.g., CSE's __common_expr_N) exist in the output but
653        // not the input, and cannot be added as pass-through Column references.
654        let existing_cols: IndexSet<Column> = existing
655            .expr
656            .iter()
657            .filter_map(|e| {
658                if let Expr::Column(c) = e {
659                    Some(c.clone())
660                } else {
661                    None
662                }
663            })
664            .collect();
665
666        let input_schema = existing.input.schema();
667        for col in columns_needed {
668            let col_expr = Expr::Column(col.clone());
669            let resolved = replace_cols_by_name(col_expr, &replace_map)?;
670            if let Expr::Column(resolved_col) = &resolved
671                && !existing_cols.contains(resolved_col)
672                && input_schema.has_column(resolved_col)
673            {
674                proj_exprs.push(Expr::Column(resolved_col.clone()));
675            }
676            // If resolved to non-column expr, it's already computed by existing projection
677        }
678
679        Projection::try_new(proj_exprs, Arc::clone(&existing.input))
680    } else {
681        // Build new projection with extracted expressions + all input columns
682        let mut proj_exprs = Vec::new();
683        for (expr, alias) in extracted_exprs {
684            proj_exprs.push(expr.clone().alias(alias));
685        }
686        for (qualifier, field) in target_schema.iter() {
687            proj_exprs.push(Expr::from((qualifier, field)));
688        }
689        Projection::try_new(proj_exprs, Arc::clone(target))
690    }
691}
692
693// =============================================================================
694// Pass 2: PushDownLeafProjections
695// =============================================================================
696
697/// Pushes extraction projections down through schema-preserving nodes towards
698/// leaf nodes (pass 2 of 2, after [`ExtractLeafExpressions`]).
699///
700/// Handles two types of projections:
701/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + columns):
702///   pushes through Filter/Sort/Limit, merges into existing projections, or routes
703///   into multi-input node inputs (Join, SubqueryAlias, etc.)
704/// - **Mixed projections** (user projections containing `MoveTowardsLeafNodes`
705///   sub-expressions): splits into a recovery projection + extraction projection,
706///   then pushes the extraction projection down.
707///
708/// # Example: Pushing through a Filter
709///
710/// After pass 1, the extraction projection sits directly below the filter:
711/// ```text
712/// Projection: id, user                                                              <-- recovery
713///   Filter: __datafusion_extracted_1 = 'active'
714///     Projection: user['status'] AS __datafusion_extracted_1, id, user               <-- extraction
715///       TableScan: t [id, user]
716/// ```
717///
718/// Pass 2 pushes the extraction projection through the recovery and filter,
719/// and a subsequent `OptimizeProjections` pass removes the (now-redundant)
720/// recovery projection:
721/// ```text
722/// Filter: __datafusion_extracted_1 = 'active'
723///   Projection: user['status'] AS __datafusion_extracted_1, id, user                 <-- extraction (pushed down)
724///     TableScan: t [id, user]
725/// ```
726#[derive(Default, Debug)]
727pub struct PushDownLeafProjections {}
728
729impl PushDownLeafProjections {
730    pub fn new() -> Self {
731        Self {}
732    }
733}
734
735impl OptimizerRule for PushDownLeafProjections {
736    fn name(&self) -> &str {
737        "push_down_leaf_projections"
738    }
739
740    fn apply_order(&self) -> Option<ApplyOrder> {
741        Some(ApplyOrder::TopDown)
742    }
743
744    fn rewrite(
745        &self,
746        plan: LogicalPlan,
747        config: &dyn OptimizerConfig,
748    ) -> Result<Transformed<LogicalPlan>> {
749        if !config.options().optimizer.enable_leaf_expression_pushdown {
750            return Ok(Transformed::no(plan));
751        }
752        let alias_generator = config.alias_generator();
753        match try_push_input(&plan, alias_generator)? {
754            Some(new_plan) => Ok(Transformed::yes(new_plan)),
755            None => Ok(Transformed::no(plan)),
756        }
757    }
758}
759
760/// Attempts to push a projection's extractable expressions further down.
761///
762/// Returns `Some(new_subtree)` if the projection was pushed down or merged,
763/// `None` if there is nothing to push or the projection sits above a barrier.
764fn try_push_input(
765    input: &LogicalPlan,
766    alias_generator: &Arc<AliasGenerator>,
767) -> Result<Option<LogicalPlan>> {
768    let LogicalPlan::Projection(proj) = input else {
769        return Ok(None);
770    };
771    split_and_push_projection(proj, alias_generator)
772}
773
774/// Splits a projection into extractable pieces, pushes them towards leaf
775/// nodes, and adds a recovery projection if needed.
776///
777/// Handles both:
778/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + columns)
779/// - **Mixed projections** (containing `MoveTowardsLeafNodes` sub-expressions)
780///
781/// Returns `Some(new_subtree)` if extractions were pushed down,
782/// `None` if there is nothing to extract or push.
783///
784/// # Example: Mixed Projection
785///
786/// ```text
787/// Input plan:
788///   Projection: user['name'] IS NOT NULL AS has_name, id
789///     Filter: ...
790///       TableScan
791///
792/// Phase 1 (Split):
793///   extraction_pairs: [(user['name'], "__datafusion_extracted_1")]
794///   recovery_exprs:   [__datafusion_extracted_1 IS NOT NULL AS has_name, id]
795///
796/// Phase 2 (Push):
797///   Push extraction projection through Filter toward TableScan
798///
799/// Phase 3 (Recovery):
800///   Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, id       <-- recovery
801///     Filter: ...
802///       Projection: user['name'] AS __datafusion_extracted_1, id           <-- extraction (pushed)
803///         TableScan
804/// ```
805fn split_and_push_projection(
806    proj: &Projection,
807    alias_generator: &Arc<AliasGenerator>,
808) -> Result<Option<LogicalPlan>> {
809    // Fast pre-check: skip if there are no pre-existing extracted aliases
810    // and no new extractable expressions.
811    let has_existing_extracted = proj.expr.iter().any(|e| {
812        matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
813    });
814    if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
815        return Ok(None);
816    }
817
818    let input = &proj.input;
819    let input_schema = input.schema();
820
821    // ── Phase 1: Split ──────────────────────────────────────────────────
822    // For each projection expression, collect extraction pairs and build
823    // recovery expressions.
824    //
825    // Pre-existing `__datafusion_extracted` aliases are inserted into the
826    // extractor's `IndexMap` with the **full** `Expr::Alias(…)` as the key,
827    // so the alias name participates in equality. This prevents collisions
828    // when CSE rewrites produce the same inner expression under different
829    // alias names (e.g. `__common_expr_4 AS __datafusion_extracted_1` and
830    // `__common_expr_4 AS __datafusion_extracted_3`). New extractions from
831    // `routing_extract` use bare (non-Alias) keys and get normal dedup.
832    //
833    // When building the final `extraction_pairs`, the Alias wrapper is
834    // stripped so consumers see the usual `(inner_expr, alias_name)` tuples.
835
836    let mut extractors = vec![LeafExpressionExtractor::new(
837        input_schema.as_ref(),
838        alias_generator,
839    )];
840    let input_column_sets = vec![schema_columns(input_schema.as_ref())];
841
842    let original_schema = proj.schema.as_ref();
843    let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
844    let mut needs_recovery = false;
845    let mut has_new_extractions = false;
846    let mut proj_exprs_captured: usize = 0;
847    // Track standalone column expressions (Case B) to detect column refs
848    // from extracted aliases (Case A) that aren't also standalone expressions.
849    let mut standalone_columns: IndexSet<Column> = IndexSet::new();
850
851    for (expr, (qualifier, field)) in proj.expr.iter().zip(original_schema.iter()) {
852        if let Expr::Alias(alias) = expr
853            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
854        {
855            // Insert the full Alias expression as the key so that
856            // distinct alias names don't collide in the IndexMap.
857            let alias_name = alias.name.clone();
858
859            for col_ref in alias.expr.column_refs() {
860                extractors[0].columns_needed.insert(col_ref.clone());
861            }
862
863            extractors[0]
864                .extracted
865                .insert(expr.clone(), alias_name.clone());
866            recovery_exprs.push(Expr::Column(Column::new_unqualified(&alias_name)));
867            proj_exprs_captured += 1;
868        } else if let Expr::Column(col) = expr {
869            // Plain column pass-through — track it in the extractor
870            extractors[0].columns_needed.insert(col.clone());
871            standalone_columns.insert(col.clone());
872            recovery_exprs.push(expr.clone());
873            proj_exprs_captured += 1;
874        } else {
875            // Everything else: run through routing_extract
876            let transformed =
877                routing_extract(expr.clone(), &mut extractors, &input_column_sets)?;
878            if transformed.transformed {
879                has_new_extractions = true;
880            }
881            let transformed_expr = transformed.data;
882
883            // Build recovery expression, aliasing back to original name if needed
884            let original_name = field.name();
885            let needs_alias = if let Expr::Column(col) = &transformed_expr {
886                col.name.as_str() != original_name
887            } else {
888                let expr_name = transformed_expr.schema_name().to_string();
889                original_name != &expr_name
890            };
891            let recovery_expr = if needs_alias {
892                needs_recovery = true;
893                transformed_expr
894                    .clone()
895                    .alias_qualified(qualifier.cloned(), original_name)
896            } else {
897                transformed_expr.clone()
898            };
899
900            // If the expression was transformed (i.e., has extracted sub-parts),
901            // it differs from what the pushed projection outputs → needs recovery.
902            // Also, any non-column, non-__datafusion_extracted expression needs recovery
903            // because the pushed extraction projection won't output it directly.
904            if transformed.transformed || !matches!(expr, Expr::Column(_)) {
905                needs_recovery = true;
906            }
907
908            recovery_exprs.push(recovery_expr);
909        }
910    }
911
912    // Build extraction_pairs, stripping the Alias wrapper from pre-existing
913    // entries (they used the full Alias as the map key to avoid dedup).
914    let extractor = &extractors[0];
915    let extraction_pairs: Vec<(Expr, String)> = extractor
916        .extracted
917        .iter()
918        .map(|(e, a)| match e {
919            Expr::Alias(alias) => (*alias.expr.clone(), a.clone()),
920            _ => (e.clone(), a.clone()),
921        })
922        .collect();
923    let columns_needed = &extractor.columns_needed;
924
925    // If no extractions found, nothing to do
926    if extraction_pairs.is_empty() {
927        return Ok(None);
928    }
929
930    // If columns_needed has entries that aren't standalone projection columns
931    // (i.e., they came from column refs inside extracted aliases), a merge
932    // into an inner projection will widen the schema with those extra columns,
933    // requiring a recovery projection to restore the original schema.
934    if columns_needed
935        .iter()
936        .any(|c| !standalone_columns.contains(c))
937    {
938        needs_recovery = true;
939    }
940
941    // ── Phase 2: Push down ──────────────────────────────────────────────
942    let proj_input = Arc::clone(&proj.input);
943    let pushed = push_extraction_pairs(
944        &extraction_pairs,
945        columns_needed,
946        proj,
947        &proj_input,
948        alias_generator,
949        proj_exprs_captured,
950    )?;
951
952    // ── Phase 3: Recovery ───────────────────────────────────────────────
953    // Determine the base plan: either the pushed result or an in-place extraction.
954    let base_plan = match pushed {
955        Some(plan) => plan,
956        None => {
957            if !has_new_extractions {
958                // Only pre-existing __datafusion_extracted aliases and columns, no new
959                // extractions from routing_extract. The original projection is
960                // already an extraction projection that couldn't be pushed
961                // further. Return None.
962                return Ok(None);
963            }
964            // Build extraction projection in-place (couldn't push down)
965            let input_arc = Arc::clone(input);
966            let extraction = build_extraction_projection_impl(
967                &extraction_pairs,
968                columns_needed,
969                &input_arc,
970                input_schema.as_ref(),
971            )?;
972            LogicalPlan::Projection(extraction)
973        }
974    };
975
976    // Wrap with recovery projection if the output schema changed
977    if needs_recovery {
978        let recovery = LogicalPlan::Projection(Projection::try_new(
979            recovery_exprs,
980            Arc::new(base_plan),
981        )?);
982        Ok(Some(recovery))
983    } else {
984        Ok(Some(base_plan))
985    }
986}
987
988/// Returns true if the plan is a Projection where ALL expressions are either
989/// `Alias(EXTRACTED_EXPR_PREFIX, ...)` or `Column`, with at least one extraction.
990/// Such projections can safely be pushed further without re-extraction.
991fn is_pure_extraction_projection(plan: &LogicalPlan) -> bool {
992    let LogicalPlan::Projection(proj) = plan else {
993        return false;
994    };
995    let mut has_extraction = false;
996    for expr in &proj.expr {
997        match expr {
998            Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX) => {
999                has_extraction = true;
1000            }
1001            Expr::Column(_) => {}
1002            _ => return false,
1003        }
1004    }
1005    has_extraction
1006}
1007
1008/// Pushes extraction pairs down through the projection's input node,
1009/// dispatching to the appropriate handler based on the input node type.
1010fn push_extraction_pairs(
1011    pairs: &[(Expr, String)],
1012    columns_needed: &IndexSet<Column>,
1013    proj: &Projection,
1014    proj_input: &Arc<LogicalPlan>,
1015    alias_generator: &Arc<AliasGenerator>,
1016    proj_exprs_captured: usize,
1017) -> Result<Option<LogicalPlan>> {
1018    match proj_input.as_ref() {
1019        // Merge into existing projection, then try to push the result further down.
1020        // Only merge when every expression in the outer projection is fully
1021        // captured as either an extraction pair (Case A: __datafusion_extracted
1022        // alias) or a plain column (Case B). Uncaptured expressions (e.g.
1023        // `col AS __common_expr_1` from CSE, or complex expressions with
1024        // extracted sub-parts) would be lost during the merge.
1025        LogicalPlan::Projection(_) if proj_exprs_captured == proj.expr.len() => {
1026            let target_schema = Arc::clone(proj_input.schema());
1027            let merged = build_extraction_projection_impl(
1028                pairs,
1029                columns_needed,
1030                proj_input,
1031                target_schema.as_ref(),
1032            )?;
1033            let merged_plan = LogicalPlan::Projection(merged);
1034
1035            // After merging, try to push the result further down, but ONLY
1036            // if the merged result is still a pure extraction projection
1037            // (all __datafusion_extracted aliases + columns). If the merge inherited
1038            // bare MoveTowardsLeafNodes expressions from the inner projection,
1039            // pushing would re-extract them into new aliases and fail when
1040            // the (None, true) fallback can't find the original aliases.
1041            // This handles: Extraction → Recovery(cols) → Filter → ... → TableScan
1042            // by pushing through the recovery projection AND the filter in one pass.
1043            if is_pure_extraction_projection(&merged_plan)
1044                && let Some(pushed) = try_push_input(&merged_plan, alias_generator)?
1045            {
1046                return Ok(Some(pushed));
1047            }
1048            Ok(Some(merged_plan))
1049        }
1050        // Generic: handles Filter/Sort/Limit (via recursion),
1051        // SubqueryAlias (with qualifier remap in try_push_into_inputs),
1052        // Join, and anything else.
1053        // Safely bails out for nodes that don't pass through extracted
1054        // columns (Aggregate, Window) via the output schema check.
1055        _ => try_push_into_inputs(
1056            pairs,
1057            columns_needed,
1058            proj_input.as_ref(),
1059            alias_generator,
1060        ),
1061    }
1062}
1063
1064/// Routes extraction pairs and columns to the appropriate inputs.
1065///
1066/// - **Union**: broadcasts to every input via [`remap_pairs_and_columns`].
1067/// - **Other nodes**: routes each expression to the one input that owns
1068///   all of its column references (via [`find_owning_input`]).
1069///
1070/// Returns `None` if any expression can't be routed or no input has pairs.
1071fn route_to_inputs(
1072    pairs: &[(Expr, String)],
1073    columns: &IndexSet<Column>,
1074    node: &LogicalPlan,
1075    input_column_sets: &[std::collections::HashSet<Column>],
1076    input_schemas: &[Arc<DFSchema>],
1077) -> Result<Option<Vec<ExtractionTarget>>> {
1078    let num_inputs = input_schemas.len();
1079    let mut per_input: Vec<ExtractionTarget> = (0..num_inputs)
1080        .map(|_| ExtractionTarget {
1081            pairs: vec![],
1082            columns: IndexSet::new(),
1083        })
1084        .collect();
1085
1086    if matches!(node, LogicalPlan::Union(_)) {
1087        // Union output schema and each input schema have the same fields by
1088        // index but may differ in qualifiers (e.g. output `s` vs input
1089        // `simple_struct.s`). Remap pairs/columns to each input's space.
1090        let union_schema = node.schema();
1091        for (idx, input_schema) in input_schemas.iter().enumerate() {
1092            per_input[idx] =
1093                remap_pairs_and_columns(pairs, columns, union_schema, input_schema)?;
1094        }
1095    } else {
1096        for (expr, alias) in pairs {
1097            match find_owning_input(expr, input_column_sets) {
1098                Some(idx) => per_input[idx].pairs.push((expr.clone(), alias.clone())),
1099                None => return Ok(None), // Cross-input expression — bail out
1100            }
1101        }
1102        for col in columns {
1103            let col_expr = Expr::Column(col.clone());
1104            match find_owning_input(&col_expr, input_column_sets) {
1105                Some(idx) => {
1106                    per_input[idx].columns.insert(col.clone());
1107                }
1108                None => return Ok(None), // Ambiguous column — bail out
1109            }
1110        }
1111    }
1112
1113    // Check at least one input has extractions to push
1114    if per_input.iter().all(|t| t.pairs.is_empty()) {
1115        return Ok(None);
1116    }
1117
1118    Ok(Some(per_input))
1119}
1120
1121/// Pushes extraction expressions into a node's inputs by routing each
1122/// expression to the input that owns all of its column references.
1123///
1124/// Works for any number of inputs (1, 2, …N). For single-input nodes,
1125/// all expressions trivially route to that input. For multi-input nodes
1126/// (Join, etc.), each expression is routed to the side that owns its columns.
1127///
1128/// Returns `Some(new_node)` if all expressions could be routed AND the
1129/// rebuilt node's output schema contains all extracted aliases.
1130/// Returns `None` if any expression references columns from multiple inputs
1131/// or the node doesn't pass through the extracted columns.
1132///
1133/// # Example: Join with expressions from both sides
1134///
1135/// ```text
1136/// Extraction projection above a Join:
1137///   Projection: left.user['name'] AS __datafusion_extracted_1, right.order['total'] AS __datafusion_extracted_2, ...
1138///     Join: left.id = right.user_id
1139///       TableScan: left [id, user]
1140///       TableScan: right [user_id, order]
1141///
1142/// After routing each expression to its owning input:
1143///   Join: left.id = right.user_id
1144///     Projection: user['name'] AS __datafusion_extracted_1, id, user              <-- left-side extraction
1145///       TableScan: left [id, user]
1146///     Projection: order['total'] AS __datafusion_extracted_2, user_id, order      <-- right-side extraction
1147///       TableScan: right [user_id, order]
1148/// ```
1149fn try_push_into_inputs(
1150    pairs: &[(Expr, String)],
1151    columns_needed: &IndexSet<Column>,
1152    node: &LogicalPlan,
1153    alias_generator: &Arc<AliasGenerator>,
1154) -> Result<Option<LogicalPlan>> {
1155    let inputs = node.inputs();
1156    if inputs.is_empty() {
1157        return Ok(None);
1158    }
1159
1160    // SubqueryAlias remaps qualifiers between input and output.
1161    // Rewrite pairs/columns from alias-space to input-space before routing.
1162    let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
1163        remap_pairs_and_columns(pairs, columns_needed, &sa.schema, sa.input.schema())?
1164    } else {
1165        ExtractionTarget {
1166            pairs: pairs.to_vec(),
1167            columns: columns_needed.clone(),
1168        }
1169    };
1170    let pairs = &remapped.pairs[..];
1171    let columns_needed = &remapped.columns;
1172
1173    // Build per-input schemas and column sets for routing
1174    let input_schemas: Vec<Arc<DFSchema>> =
1175        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1176    let input_column_sets: Vec<std::collections::HashSet<Column>> =
1177        input_schemas.iter().map(|s| schema_columns(s)).collect();
1178
1179    // Route pairs and columns to the appropriate inputs
1180    let per_input = match route_to_inputs(
1181        pairs,
1182        columns_needed,
1183        node,
1184        &input_column_sets,
1185        &input_schemas,
1186    )? {
1187        Some(routed) => routed,
1188        None => return Ok(None),
1189    };
1190
1191    let num_inputs = inputs.len();
1192
1193    // Build per-input extraction projections and push them as far as possible
1194    // immediately. This is critical because map_children preserves cached schemas,
1195    // so if the TopDown pass later pushes a child further (changing its output
1196    // schema), the parent node's schema becomes stale.
1197    let mut new_inputs: Vec<LogicalPlan> = Vec::with_capacity(num_inputs);
1198    for (idx, input) in inputs.into_iter().enumerate() {
1199        if per_input[idx].pairs.is_empty() {
1200            new_inputs.push(input.clone());
1201        } else {
1202            let input_arc = Arc::new(input.clone());
1203            let target_schema = Arc::clone(input.schema());
1204            let proj = build_extraction_projection_impl(
1205                &per_input[idx].pairs,
1206                &per_input[idx].columns,
1207                &input_arc,
1208                target_schema.as_ref(),
1209            )?;
1210            // Verify all requested aliases appear in the projection's output.
1211            // A merge may deduplicate if the same expression already exists
1212            // under a different alias, leaving the requested alias missing.
1213            let proj_schema = proj.schema.as_ref();
1214            for (_expr, alias) in &per_input[idx].pairs {
1215                if !proj_schema.fields().iter().any(|f| f.name() == alias) {
1216                    return Ok(None);
1217                }
1218            }
1219            let proj_plan = LogicalPlan::Projection(proj);
1220            // Try to push the extraction projection further down within
1221            // this input (e.g., through Filter → existing extraction projection).
1222            // This ensures the input's output schema is stable and won't change
1223            // when the TopDown pass later visits children.
1224            match try_push_input(&proj_plan, alias_generator)? {
1225                Some(pushed) => new_inputs.push(pushed),
1226                None => new_inputs.push(proj_plan),
1227            }
1228        }
1229    }
1230
1231    // Rebuild the node with new inputs
1232    let new_node = node.with_new_exprs(node.expressions(), new_inputs)?;
1233
1234    // Safety check: verify all extracted aliases appear in the rebuilt
1235    // node's output schema. Nodes like Aggregate define their own output
1236    // and won't pass through extracted columns — bail out for those.
1237    let output_schema = new_node.schema();
1238    for (_expr, alias) in pairs {
1239        if !output_schema.fields().iter().any(|f| f.name() == alias) {
1240            return Ok(None);
1241        }
1242    }
1243
1244    Ok(Some(new_node))
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use std::sync::Arc;
1250
1251    use super::*;
1252    use crate::optimize_projections::OptimizeProjections;
1253    use crate::test::udfs::PlacementTestUDF;
1254    use crate::test::*;
1255    use crate::{Optimizer, OptimizerContext};
1256    use datafusion_common::Result;
1257    use datafusion_expr::expr::ScalarFunction;
1258    use datafusion_expr::{Expr, ExpressionPlacement};
1259    use datafusion_expr::{
1260        ScalarUDF, col, lit, logical_plan::builder::LogicalPlanBuilder,
1261    };
1262
1263    fn leaf_udf(expr: Expr, name: &str) -> Expr {
1264        Expr::ScalarFunction(ScalarFunction::new_udf(
1265            Arc::new(ScalarUDF::new_from_impl(
1266                PlacementTestUDF::new()
1267                    .with_placement(ExpressionPlacement::MoveTowardsLeafNodes),
1268            )),
1269            vec![expr, lit(name)],
1270        ))
1271    }
1272
1273    // =========================================================================
1274    // Combined optimization stage formatter
1275    // =========================================================================
1276
1277    /// Runs all 4 optimization stages and returns a single formatted string.
1278    /// Stages that produce the same plan as the previous stage show
1279    /// "(same as <previous>)" to reduce noise.
1280    ///
1281    /// Stages:
1282    /// 1. **Original** - OptimizeProjections only (baseline)
1283    /// 2. **After Extraction** - + ExtractLeafExpressions
1284    /// 3. **After Pushdown** - + PushDownLeafProjections
1285    /// 4. **Optimized** - + final OptimizeProjections
1286    fn format_optimization_stages(plan: &LogicalPlan) -> Result<String> {
1287        let run = |rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>| -> Result<String> {
1288            let ctx = OptimizerContext::new().with_max_passes(1);
1289            let optimizer = Optimizer::with_rules(rules);
1290            let optimized = optimizer.optimize(plan.clone(), &ctx, |_, _| {})?;
1291            Ok(format!("{optimized}"))
1292        };
1293
1294        let original = run(vec![Arc::new(OptimizeProjections::new())])?;
1295
1296        let after_extract = run(vec![
1297            Arc::new(OptimizeProjections::new()),
1298            Arc::new(ExtractLeafExpressions::new()),
1299        ])?;
1300
1301        let after_pushdown = run(vec![
1302            Arc::new(OptimizeProjections::new()),
1303            Arc::new(ExtractLeafExpressions::new()),
1304            Arc::new(PushDownLeafProjections::new()),
1305        ])?;
1306
1307        let optimized = run(vec![
1308            Arc::new(OptimizeProjections::new()),
1309            Arc::new(ExtractLeafExpressions::new()),
1310            Arc::new(PushDownLeafProjections::new()),
1311            Arc::new(OptimizeProjections::new()),
1312        ])?;
1313
1314        let mut out = format!("## Original Plan\n{original}");
1315
1316        out.push_str("\n\n## After Extraction\n");
1317        if after_extract == original {
1318            out.push_str("(same as original)");
1319        } else {
1320            out.push_str(&after_extract);
1321        }
1322
1323        out.push_str("\n\n## After Pushdown\n");
1324        if after_pushdown == after_extract {
1325            out.push_str("(same as after extraction)");
1326        } else {
1327            out.push_str(&after_pushdown);
1328        }
1329
1330        out.push_str("\n\n## Optimized\n");
1331        if optimized == after_pushdown {
1332            out.push_str("(same as after pushdown)");
1333        } else {
1334            out.push_str(&optimized);
1335        }
1336
1337        Ok(out)
1338    }
1339
1340    /// Assert all optimization stages for a plan in a single insta snapshot.
1341    macro_rules! assert_stages {
1342        ($plan:expr, @ $expected:literal $(,)?) => {{
1343            let result = format_optimization_stages(&$plan)?;
1344            insta::assert_snapshot!(result, @ $expected);
1345            Ok::<(), datafusion_common::DataFusionError>(())
1346        }};
1347    }
1348
1349    #[test]
1350    fn test_extract_from_filter() -> Result<()> {
1351        let table_scan = test_table_scan_with_struct()?;
1352        let plan = LogicalPlanBuilder::from(table_scan.clone())
1353            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1354            .select(vec![
1355                table_scan
1356                    .schema()
1357                    .index_of_column_by_name(None, "id")
1358                    .unwrap(),
1359            ])?
1360            .build()?;
1361
1362        assert_stages!(plan, @r#"
1363        ## Original Plan
1364        Projection: test.id
1365          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1366            TableScan: test projection=[id, user]
1367
1368        ## After Extraction
1369        Projection: test.id
1370          Projection: test.id, test.user
1371            Filter: __datafusion_extracted_1 = Utf8("active")
1372              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
1373                TableScan: test projection=[id, user]
1374
1375        ## After Pushdown
1376        (same as after extraction)
1377
1378        ## Optimized
1379        Projection: test.id
1380          Filter: __datafusion_extracted_1 = Utf8("active")
1381            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
1382              TableScan: test projection=[id, user]
1383        "#)
1384    }
1385
1386    #[test]
1387    fn test_no_extraction_for_column() -> Result<()> {
1388        let table_scan = test_table_scan()?;
1389        let plan = LogicalPlanBuilder::from(table_scan)
1390            .filter(col("a").eq(lit(1)))?
1391            .build()?;
1392
1393        assert_stages!(plan, @"
1394        ## Original Plan
1395        Filter: test.a = Int32(1)
1396          TableScan: test projection=[a, b, c]
1397
1398        ## After Extraction
1399        (same as original)
1400
1401        ## After Pushdown
1402        (same as after extraction)
1403
1404        ## Optimized
1405        (same as after pushdown)
1406        ")
1407    }
1408
1409    #[test]
1410    fn test_extract_from_projection() -> Result<()> {
1411        let table_scan = test_table_scan_with_struct()?;
1412        let plan = LogicalPlanBuilder::from(table_scan)
1413            .project(vec![leaf_udf(col("user"), "name")])?
1414            .build()?;
1415
1416        assert_stages!(plan, @r#"
1417        ## Original Plan
1418        Projection: leaf_udf(test.user, Utf8("name"))
1419          TableScan: test projection=[user]
1420
1421        ## After Extraction
1422        (same as original)
1423
1424        ## After Pushdown
1425        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1426          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1427            TableScan: test projection=[user]
1428
1429        ## Optimized
1430        Projection: leaf_udf(test.user, Utf8("name"))
1431          TableScan: test projection=[user]
1432        "#)
1433    }
1434
1435    #[test]
1436    fn test_extract_from_projection_with_subexpression() -> Result<()> {
1437        let table_scan = test_table_scan_with_struct()?;
1438        let plan = LogicalPlanBuilder::from(table_scan)
1439            .project(vec![
1440                leaf_udf(col("user"), "name")
1441                    .is_not_null()
1442                    .alias("has_name"),
1443            ])?
1444            .build()?;
1445
1446        assert_stages!(plan, @r#"
1447        ## Original Plan
1448        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1449          TableScan: test projection=[user]
1450
1451        ## After Extraction
1452        (same as original)
1453
1454        ## After Pushdown
1455        Projection: __datafusion_extracted_1 IS NOT NULL AS has_name
1456          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1457            TableScan: test projection=[user]
1458
1459        ## Optimized
1460        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1461          TableScan: test projection=[user]
1462        "#)
1463    }
1464
1465    #[test]
1466    fn test_projection_no_extraction_for_column() -> Result<()> {
1467        let table_scan = test_table_scan()?;
1468        let plan = LogicalPlanBuilder::from(table_scan)
1469            .project(vec![col("a"), col("b")])?
1470            .build()?;
1471
1472        assert_stages!(plan, @"
1473        ## Original Plan
1474        TableScan: test projection=[a, b]
1475
1476        ## After Extraction
1477        (same as original)
1478
1479        ## After Pushdown
1480        (same as after extraction)
1481
1482        ## Optimized
1483        (same as after pushdown)
1484        ")
1485    }
1486
1487    #[test]
1488    fn test_filter_with_deduplication() -> Result<()> {
1489        let table_scan = test_table_scan_with_struct()?;
1490        let field_access = leaf_udf(col("user"), "name");
1491        // Filter with the same expression used twice
1492        let plan = LogicalPlanBuilder::from(table_scan)
1493            .filter(
1494                field_access
1495                    .clone()
1496                    .is_not_null()
1497                    .and(field_access.is_null()),
1498            )?
1499            .build()?;
1500
1501        assert_stages!(plan, @r#"
1502        ## Original Plan
1503        Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL AND leaf_udf(test.user, Utf8("name")) IS NULL
1504          TableScan: test projection=[id, user]
1505
1506        ## After Extraction
1507        Projection: test.id, test.user
1508          Filter: __datafusion_extracted_1 IS NOT NULL AND __datafusion_extracted_1 IS NULL
1509            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1510              TableScan: test projection=[id, user]
1511
1512        ## After Pushdown
1513        (same as after extraction)
1514
1515        ## Optimized
1516        (same as after pushdown)
1517        "#)
1518    }
1519
1520    #[test]
1521    fn test_already_leaf_expression_in_filter() -> Result<()> {
1522        let table_scan = test_table_scan_with_struct()?;
1523        let plan = LogicalPlanBuilder::from(table_scan)
1524            .filter(leaf_udf(col("user"), "name").eq(lit("test")))?
1525            .build()?;
1526
1527        assert_stages!(plan, @r#"
1528        ## Original Plan
1529        Filter: leaf_udf(test.user, Utf8("name")) = Utf8("test")
1530          TableScan: test projection=[id, user]
1531
1532        ## After Extraction
1533        Projection: test.id, test.user
1534          Filter: __datafusion_extracted_1 = Utf8("test")
1535            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1536              TableScan: test projection=[id, user]
1537
1538        ## After Pushdown
1539        (same as after extraction)
1540
1541        ## Optimized
1542        (same as after pushdown)
1543        "#)
1544    }
1545
1546    #[test]
1547    fn test_extract_from_aggregate_group_by() -> Result<()> {
1548        use datafusion_expr::test::function_stub::count;
1549
1550        let table_scan = test_table_scan_with_struct()?;
1551        let plan = LogicalPlanBuilder::from(table_scan)
1552            .aggregate(vec![leaf_udf(col("user"), "status")], vec![count(lit(1))])?
1553            .build()?;
1554
1555        assert_stages!(plan, @r#"
1556        ## Original Plan
1557        Aggregate: groupBy=[[leaf_udf(test.user, Utf8("status"))]], aggr=[[COUNT(Int32(1))]]
1558          TableScan: test projection=[user]
1559
1560        ## After Extraction
1561        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1562          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1563            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1564              TableScan: test projection=[user]
1565
1566        ## After Pushdown
1567        (same as after extraction)
1568
1569        ## Optimized
1570        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1571          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1572            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
1573              TableScan: test projection=[user]
1574        "#)
1575    }
1576
1577    #[test]
1578    fn test_extract_from_aggregate_args() -> Result<()> {
1579        use datafusion_expr::test::function_stub::count;
1580
1581        let table_scan = test_table_scan_with_struct()?;
1582        let plan = LogicalPlanBuilder::from(table_scan)
1583            .aggregate(
1584                vec![col("user")],
1585                vec![count(leaf_udf(col("user"), "value"))],
1586            )?
1587            .build()?;
1588
1589        assert_stages!(plan, @r#"
1590        ## Original Plan
1591        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value")))]]
1592          TableScan: test projection=[user]
1593
1594        ## After Extraction
1595        Projection: test.user, COUNT(__datafusion_extracted_1) AS COUNT(leaf_udf(test.user,Utf8("value")))
1596          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1)]]
1597            Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1598              TableScan: test projection=[user]
1599
1600        ## After Pushdown
1601        (same as after extraction)
1602
1603        ## Optimized
1604        (same as after pushdown)
1605        "#)
1606    }
1607
1608    #[test]
1609    fn test_projection_with_filter_combined() -> Result<()> {
1610        let table_scan = test_table_scan_with_struct()?;
1611        let plan = LogicalPlanBuilder::from(table_scan)
1612            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1613            .project(vec![leaf_udf(col("user"), "name")])?
1614            .build()?;
1615
1616        assert_stages!(plan, @r#"
1617        ## Original Plan
1618        Projection: leaf_udf(test.user, Utf8("name"))
1619          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1620            TableScan: test projection=[user]
1621
1622        ## After Extraction
1623        Projection: leaf_udf(test.user, Utf8("name"))
1624          Projection: test.user
1625            Filter: __datafusion_extracted_1 = Utf8("active")
1626              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1627                TableScan: test projection=[user]
1628
1629        ## After Pushdown
1630        Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1631          Filter: __datafusion_extracted_1 = Utf8("active")
1632            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1633              TableScan: test projection=[user]
1634
1635        ## Optimized
1636        Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1637          Filter: __datafusion_extracted_1 = Utf8("active")
1638            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1639              TableScan: test projection=[user]
1640        "#)
1641    }
1642
1643    #[test]
1644    fn test_projection_preserves_alias() -> Result<()> {
1645        let table_scan = test_table_scan_with_struct()?;
1646        let plan = LogicalPlanBuilder::from(table_scan)
1647            .project(vec![leaf_udf(col("user"), "name").alias("username")])?
1648            .build()?;
1649
1650        assert_stages!(plan, @r#"
1651        ## Original Plan
1652        Projection: leaf_udf(test.user, Utf8("name")) AS username
1653          TableScan: test projection=[user]
1654
1655        ## After Extraction
1656        (same as original)
1657
1658        ## After Pushdown
1659        Projection: __datafusion_extracted_1 AS username
1660          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1661            TableScan: test projection=[user]
1662
1663        ## Optimized
1664        Projection: leaf_udf(test.user, Utf8("name")) AS username
1665          TableScan: test projection=[user]
1666        "#)
1667    }
1668
1669    /// Test: Projection with different field than Filter
1670    /// SELECT id, s['label'] FROM t WHERE s['value'] > 150
1671    /// Both s['label'] and s['value'] should be in a single extraction projection.
1672    #[test]
1673    fn test_projection_different_field_from_filter() -> Result<()> {
1674        let table_scan = test_table_scan_with_struct()?;
1675        let plan = LogicalPlanBuilder::from(table_scan)
1676            .filter(leaf_udf(col("user"), "value").gt(lit(150)))?
1677            .project(vec![col("user"), leaf_udf(col("user"), "label")])?
1678            .build()?;
1679
1680        assert_stages!(plan, @r#"
1681        ## Original Plan
1682        Projection: test.user, leaf_udf(test.user, Utf8("label"))
1683          Filter: leaf_udf(test.user, Utf8("value")) > Int32(150)
1684            TableScan: test projection=[user]
1685
1686        ## After Extraction
1687        Projection: test.user, leaf_udf(test.user, Utf8("label"))
1688          Projection: test.user
1689            Filter: __datafusion_extracted_1 > Int32(150)
1690              Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1691                TableScan: test projection=[user]
1692
1693        ## After Pushdown
1694        Projection: test.user, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("label"))
1695          Filter: __datafusion_extracted_1 > Int32(150)
1696            Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("label")) AS __datafusion_extracted_2
1697              TableScan: test projection=[user]
1698
1699        ## Optimized
1700        (same as after pushdown)
1701        "#)
1702    }
1703
1704    #[test]
1705    fn test_projection_deduplication() -> Result<()> {
1706        let table_scan = test_table_scan_with_struct()?;
1707        let field = leaf_udf(col("user"), "name");
1708        let plan = LogicalPlanBuilder::from(table_scan)
1709            .project(vec![field.clone(), field.clone().alias("name2")])?
1710            .build()?;
1711
1712        assert_stages!(plan, @r#"
1713        ## Original Plan
1714        Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1715          TableScan: test projection=[user]
1716
1717        ## After Extraction
1718        (same as original)
1719
1720        ## After Pushdown
1721        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_1 AS name2
1722          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1723            TableScan: test projection=[user]
1724
1725        ## Optimized
1726        Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1727          TableScan: test projection=[user]
1728        "#)
1729    }
1730
1731    // =========================================================================
1732    // Additional tests for code coverage
1733    // =========================================================================
1734
1735    /// Extractions push through Sort nodes to reach the TableScan.
1736    #[test]
1737    fn test_extract_through_sort() -> Result<()> {
1738        let table_scan = test_table_scan_with_struct()?;
1739        let plan = LogicalPlanBuilder::from(table_scan)
1740            .sort(vec![col("user").sort(true, true)])?
1741            .project(vec![leaf_udf(col("user"), "name")])?
1742            .build()?;
1743
1744        assert_stages!(plan, @r#"
1745        ## Original Plan
1746        Projection: leaf_udf(test.user, Utf8("name"))
1747          Sort: test.user ASC NULLS FIRST
1748            TableScan: test projection=[user]
1749
1750        ## After Extraction
1751        (same as original)
1752
1753        ## After Pushdown
1754        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1755          Sort: test.user ASC NULLS FIRST
1756            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1757              TableScan: test projection=[user]
1758
1759        ## Optimized
1760        (same as after pushdown)
1761        "#)
1762    }
1763
1764    /// Extractions push through Limit nodes to reach the TableScan.
1765    #[test]
1766    fn test_extract_through_limit() -> Result<()> {
1767        let table_scan = test_table_scan_with_struct()?;
1768        let plan = LogicalPlanBuilder::from(table_scan)
1769            .limit(0, Some(10))?
1770            .project(vec![leaf_udf(col("user"), "name")])?
1771            .build()?;
1772
1773        assert_stages!(plan, @r#"
1774        ## Original Plan
1775        Projection: leaf_udf(test.user, Utf8("name"))
1776          Limit: skip=0, fetch=10
1777            TableScan: test projection=[user]
1778
1779        ## After Extraction
1780        (same as original)
1781
1782        ## After Pushdown
1783        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1784          Limit: skip=0, fetch=10
1785            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1786              TableScan: test projection=[user]
1787
1788        ## Optimized
1789        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1790          Limit: skip=0, fetch=10
1791            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1792              TableScan: test projection=[user]
1793        "#)
1794    }
1795
1796    /// Aliased aggregate functions like count(...).alias("cnt") are handled.
1797    #[test]
1798    fn test_extract_from_aliased_aggregate() -> Result<()> {
1799        use datafusion_expr::test::function_stub::count;
1800
1801        let table_scan = test_table_scan_with_struct()?;
1802        let plan = LogicalPlanBuilder::from(table_scan)
1803            .aggregate(
1804                vec![col("user")],
1805                vec![count(leaf_udf(col("user"), "value")).alias("cnt")],
1806            )?
1807            .build()?;
1808
1809        assert_stages!(plan, @r#"
1810        ## Original Plan
1811        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value"))) AS cnt]]
1812          TableScan: test projection=[user]
1813
1814        ## After Extraction
1815        Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1) AS cnt]]
1816          Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1817            TableScan: test projection=[user]
1818
1819        ## After Pushdown
1820        (same as after extraction)
1821
1822        ## Optimized
1823        (same as after pushdown)
1824        "#)
1825    }
1826
1827    /// Aggregates with no MoveTowardsLeafNodes expressions return unchanged.
1828    #[test]
1829    fn test_aggregate_no_extraction() -> Result<()> {
1830        use datafusion_expr::test::function_stub::count;
1831
1832        let table_scan = test_table_scan()?;
1833        let plan = LogicalPlanBuilder::from(table_scan)
1834            .aggregate(vec![col("a")], vec![count(col("b"))])?
1835            .build()?;
1836
1837        assert_stages!(plan, @"
1838        ## Original Plan
1839        Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b)]]
1840          TableScan: test projection=[a, b]
1841
1842        ## After Extraction
1843        (same as original)
1844
1845        ## After Pushdown
1846        (same as after extraction)
1847
1848        ## Optimized
1849        (same as after pushdown)
1850        ")
1851    }
1852
1853    /// Projections containing extracted expression aliases are skipped (already extracted).
1854    #[test]
1855    fn test_skip_extracted_projection() -> Result<()> {
1856        let table_scan = test_table_scan_with_struct()?;
1857        let plan = LogicalPlanBuilder::from(table_scan)
1858            .project(vec![
1859                leaf_udf(col("user"), "name").alias("__datafusion_extracted_manual"),
1860                col("user"),
1861            ])?
1862            .build()?;
1863
1864        assert_stages!(plan, @r#"
1865        ## Original Plan
1866        Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_manual, test.user
1867          TableScan: test projection=[user]
1868
1869        ## After Extraction
1870        (same as original)
1871
1872        ## After Pushdown
1873        (same as after extraction)
1874
1875        ## Optimized
1876        (same as after pushdown)
1877        "#)
1878    }
1879
1880    /// Multiple extractions merge into a single extracted expression projection.
1881    #[test]
1882    fn test_merge_into_existing_extracted_projection() -> Result<()> {
1883        let table_scan = test_table_scan_with_struct()?;
1884        let plan = LogicalPlanBuilder::from(table_scan)
1885            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1886            .filter(leaf_udf(col("user"), "name").is_not_null())?
1887            .build()?;
1888
1889        assert_stages!(plan, @r#"
1890        ## Original Plan
1891        Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL
1892          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1893            TableScan: test projection=[id, user]
1894
1895        ## After Extraction
1896        Projection: test.id, test.user
1897          Filter: __datafusion_extracted_1 IS NOT NULL
1898            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1899              Projection: test.id, test.user
1900                Filter: __datafusion_extracted_2 = Utf8("active")
1901                  Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user
1902                    TableScan: test projection=[id, user]
1903
1904        ## After Pushdown
1905        Projection: test.id, test.user
1906          Filter: __datafusion_extracted_1 IS NOT NULL
1907            Filter: __datafusion_extracted_2 = Utf8("active")
1908              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1909                TableScan: test projection=[id, user]
1910
1911        ## Optimized
1912        Projection: test.id, test.user
1913          Filter: __datafusion_extracted_1 IS NOT NULL
1914            Projection: test.id, test.user, __datafusion_extracted_1
1915              Filter: __datafusion_extracted_2 = Utf8("active")
1916                Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1917                  TableScan: test projection=[id, user]
1918        "#)
1919    }
1920
1921    /// Extractions push through passthrough projections (columns only).
1922    #[test]
1923    fn test_extract_through_passthrough_projection() -> Result<()> {
1924        let table_scan = test_table_scan_with_struct()?;
1925        let plan = LogicalPlanBuilder::from(table_scan)
1926            .project(vec![col("user")])?
1927            .project(vec![leaf_udf(col("user"), "name")])?
1928            .build()?;
1929
1930        assert_stages!(plan, @r#"
1931        ## Original Plan
1932        Projection: leaf_udf(test.user, Utf8("name"))
1933          TableScan: test projection=[user]
1934
1935        ## After Extraction
1936        (same as original)
1937
1938        ## After Pushdown
1939        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1940          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1941            TableScan: test projection=[user]
1942
1943        ## Optimized
1944        Projection: leaf_udf(test.user, Utf8("name"))
1945          TableScan: test projection=[user]
1946        "#)
1947    }
1948
1949    /// Projections with aliased columns (nothing to extract) return unchanged.
1950    #[test]
1951    fn test_projection_early_return_no_extraction() -> Result<()> {
1952        let table_scan = test_table_scan()?;
1953        let plan = LogicalPlanBuilder::from(table_scan)
1954            .project(vec![col("a").alias("x"), col("b")])?
1955            .build()?;
1956
1957        assert_stages!(plan, @"
1958        ## Original Plan
1959        Projection: test.a AS x, test.b
1960          TableScan: test projection=[a, b]
1961
1962        ## After Extraction
1963        (same as original)
1964
1965        ## After Pushdown
1966        (same as after extraction)
1967
1968        ## Optimized
1969        (same as after pushdown)
1970        ")
1971    }
1972
1973    /// Projections with arithmetic expressions but no MoveTowardsLeafNodes return unchanged.
1974    #[test]
1975    fn test_projection_with_arithmetic_no_extraction() -> Result<()> {
1976        let table_scan = test_table_scan()?;
1977        let plan = LogicalPlanBuilder::from(table_scan)
1978            .project(vec![(col("a") + col("b")).alias("sum")])?
1979            .build()?;
1980
1981        assert_stages!(plan, @"
1982        ## Original Plan
1983        Projection: test.a + test.b AS sum
1984          TableScan: test projection=[a, b]
1985
1986        ## After Extraction
1987        (same as original)
1988
1989        ## After Pushdown
1990        (same as after extraction)
1991
1992        ## Optimized
1993        (same as after pushdown)
1994        ")
1995    }
1996
1997    /// Aggregate extractions merge into existing extracted projection created by Filter.
1998    #[test]
1999    fn test_aggregate_merge_into_extracted_projection() -> Result<()> {
2000        use datafusion_expr::test::function_stub::count;
2001
2002        let table_scan = test_table_scan_with_struct()?;
2003        let plan = LogicalPlanBuilder::from(table_scan)
2004            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2005            .aggregate(vec![leaf_udf(col("user"), "name")], vec![count(lit(1))])?
2006            .build()?;
2007
2008        assert_stages!(plan, @r#"
2009        ## Original Plan
2010        Aggregate: groupBy=[[leaf_udf(test.user, Utf8("name"))]], aggr=[[COUNT(Int32(1))]]
2011          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2012            TableScan: test projection=[user]
2013
2014        ## After Extraction
2015        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2016          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2017            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2018              Projection: test.user
2019                Filter: __datafusion_extracted_2 = Utf8("active")
2020                  Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user
2021                    TableScan: test projection=[user]
2022
2023        ## After Pushdown
2024        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2025          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2026            Filter: __datafusion_extracted_2 = Utf8("active")
2027              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2028                TableScan: test projection=[user]
2029
2030        ## Optimized
2031        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2032          Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2033            Projection: __datafusion_extracted_1
2034              Filter: __datafusion_extracted_2 = Utf8("active")
2035                Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2036                  TableScan: test projection=[user]
2037        "#)
2038    }
2039
2040    /// Projection containing a MoveTowardsLeafNodes sub-expression above an
2041    /// Aggregate. Aggregate blocks pushdown, so the (None, true) recovery
2042    /// fallback path fires: in-place extraction + recovery projection.
2043    #[test]
2044    fn test_projection_with_leaf_expr_above_aggregate() -> Result<()> {
2045        use datafusion_expr::test::function_stub::count;
2046
2047        let table_scan = test_table_scan_with_struct()?;
2048        let plan = LogicalPlanBuilder::from(table_scan)
2049            .aggregate(vec![col("user")], vec![count(lit(1))])?
2050            .project(vec![
2051                leaf_udf(col("user"), "name")
2052                    .is_not_null()
2053                    .alias("has_name"),
2054                col("COUNT(Int32(1))"),
2055            ])?
2056            .build()?;
2057
2058        assert_stages!(plan, @r#"
2059        ## Original Plan
2060        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2061          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2062            TableScan: test projection=[user]
2063
2064        ## After Extraction
2065        (same as original)
2066
2067        ## After Pushdown
2068        Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, COUNT(Int32(1))
2069          Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user, COUNT(Int32(1))
2070            Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2071              TableScan: test projection=[user]
2072
2073        ## Optimized
2074        Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2075          Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2076            TableScan: test projection=[user]
2077        "#)
2078    }
2079
2080    /// Merging adds new pass-through columns not in the existing extracted projection.
2081    #[test]
2082    fn test_merge_with_new_columns() -> Result<()> {
2083        let table_scan = test_table_scan()?;
2084        let plan = LogicalPlanBuilder::from(table_scan)
2085            .filter(leaf_udf(col("a"), "x").eq(lit(1)))?
2086            .filter(leaf_udf(col("b"), "y").eq(lit(2)))?
2087            .build()?;
2088
2089        assert_stages!(plan, @r#"
2090        ## Original Plan
2091        Filter: leaf_udf(test.b, Utf8("y")) = Int32(2)
2092          Filter: leaf_udf(test.a, Utf8("x")) = Int32(1)
2093            TableScan: test projection=[a, b, c]
2094
2095        ## After Extraction
2096        Projection: test.a, test.b, test.c
2097          Filter: __datafusion_extracted_1 = Int32(2)
2098            Projection: leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1, test.a, test.b, test.c
2099              Projection: test.a, test.b, test.c
2100                Filter: __datafusion_extracted_2 = Int32(1)
2101                  Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c
2102                    TableScan: test projection=[a, b, c]
2103
2104        ## After Pushdown
2105        Projection: test.a, test.b, test.c
2106          Filter: __datafusion_extracted_1 = Int32(2)
2107            Filter: __datafusion_extracted_2 = Int32(1)
2108              Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2109                TableScan: test projection=[a, b, c]
2110
2111        ## Optimized
2112        Projection: test.a, test.b, test.c
2113          Filter: __datafusion_extracted_1 = Int32(2)
2114            Projection: test.a, test.b, test.c, __datafusion_extracted_1
2115              Filter: __datafusion_extracted_2 = Int32(1)
2116                Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2117                  TableScan: test projection=[a, b, c]
2118        "#)
2119    }
2120
2121    // =========================================================================
2122    // Join extraction tests
2123    // =========================================================================
2124
2125    /// Create a second table scan with struct field for join tests
2126    fn test_table_scan_with_struct_named(name: &str) -> Result<LogicalPlan> {
2127        use arrow::datatypes::Schema;
2128        let schema = Schema::new(test_table_scan_with_struct_fields());
2129        datafusion_expr::logical_plan::table_scan(Some(name), &schema, None)?.build()
2130    }
2131
2132    /// Extraction from equijoin keys (`on` expressions).
2133    #[test]
2134    fn test_extract_from_join_on() -> Result<()> {
2135        use datafusion_expr::JoinType;
2136
2137        let left = test_table_scan_with_struct()?;
2138        let right = test_table_scan_with_struct_named("right")?;
2139
2140        let plan = LogicalPlanBuilder::from(left)
2141            .join_with_expr_keys(
2142                right,
2143                JoinType::Inner,
2144                (
2145                    vec![leaf_udf(col("user"), "id")],
2146                    vec![leaf_udf(col("user"), "id")],
2147                ),
2148                None,
2149            )?
2150            .build()?;
2151
2152        assert_stages!(plan, @r#"
2153        ## Original Plan
2154        Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2155          TableScan: test projection=[id, user]
2156          TableScan: right projection=[id, user]
2157
2158        ## After Extraction
2159        Projection: test.id, test.user, right.id, right.user
2160          Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2
2161            Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_1, test.id, test.user
2162              TableScan: test projection=[id, user]
2163            Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_2, right.id, right.user
2164              TableScan: right projection=[id, user]
2165
2166        ## After Pushdown
2167        (same as after extraction)
2168
2169        ## Optimized
2170        (same as after pushdown)
2171        "#)
2172    }
2173
2174    /// Extraction from non-equi join filter.
2175    #[test]
2176    fn test_extract_from_join_filter() -> Result<()> {
2177        use datafusion_expr::JoinType;
2178
2179        let left = test_table_scan_with_struct()?;
2180        let right = test_table_scan_with_struct_named("right")?;
2181
2182        let plan = LogicalPlanBuilder::from(left)
2183            .join_on(
2184                right,
2185                JoinType::Inner,
2186                vec![
2187                    col("test.user").eq(col("right.user")),
2188                    leaf_udf(col("test.user"), "status").eq(lit("active")),
2189                ],
2190            )?
2191            .build()?;
2192
2193        assert_stages!(plan, @r#"
2194        ## Original Plan
2195        Inner Join:  Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active")
2196          TableScan: test projection=[id, user]
2197          TableScan: right projection=[id, user]
2198
2199        ## After Extraction
2200        Projection: test.id, test.user, right.id, right.user
2201          Inner Join:  Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active")
2202            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2203              TableScan: test projection=[id, user]
2204            TableScan: right projection=[id, user]
2205
2206        ## After Pushdown
2207        (same as after extraction)
2208
2209        ## Optimized
2210        (same as after pushdown)
2211        "#)
2212    }
2213
2214    /// Extraction from both left and right sides of a join.
2215    #[test]
2216    fn test_extract_from_join_both_sides() -> Result<()> {
2217        use datafusion_expr::JoinType;
2218
2219        let left = test_table_scan_with_struct()?;
2220        let right = test_table_scan_with_struct_named("right")?;
2221
2222        let plan = LogicalPlanBuilder::from(left)
2223            .join_on(
2224                right,
2225                JoinType::Inner,
2226                vec![
2227                    col("test.user").eq(col("right.user")),
2228                    leaf_udf(col("test.user"), "status").eq(lit("active")),
2229                    leaf_udf(col("right.user"), "role").eq(lit("admin")),
2230                ],
2231            )?
2232            .build()?;
2233
2234        assert_stages!(plan, @r#"
2235        ## Original Plan
2236        Inner Join:  Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active") AND leaf_udf(right.user, Utf8("role")) = Utf8("admin")
2237          TableScan: test projection=[id, user]
2238          TableScan: right projection=[id, user]
2239
2240        ## After Extraction
2241        Projection: test.id, test.user, right.id, right.user
2242          Inner Join:  Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active") AND __datafusion_extracted_2 = Utf8("admin")
2243            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2244              TableScan: test projection=[id, user]
2245            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2246              TableScan: right projection=[id, user]
2247
2248        ## After Pushdown
2249        (same as after extraction)
2250
2251        ## Optimized
2252        (same as after pushdown)
2253        "#)
2254    }
2255
2256    /// Join with no MoveTowardsLeafNodes expressions returns unchanged.
2257    #[test]
2258    fn test_extract_from_join_no_extraction() -> Result<()> {
2259        use datafusion_expr::JoinType;
2260
2261        let left = test_table_scan()?;
2262        let right = test_table_scan_with_name("right")?;
2263
2264        let plan = LogicalPlanBuilder::from(left)
2265            .join(right, JoinType::Inner, (vec!["a"], vec!["a"]), None)?
2266            .build()?;
2267
2268        assert_stages!(plan, @"
2269        ## Original Plan
2270        Inner Join: test.a = right.a
2271          TableScan: test projection=[a, b, c]
2272          TableScan: right projection=[a, b, c]
2273
2274        ## After Extraction
2275        (same as original)
2276
2277        ## After Pushdown
2278        (same as after extraction)
2279
2280        ## Optimized
2281        (same as after pushdown)
2282        ")
2283    }
2284
2285    /// Join followed by filter with extraction.
2286    #[test]
2287    fn test_extract_from_filter_above_join() -> Result<()> {
2288        use datafusion_expr::JoinType;
2289
2290        let left = test_table_scan_with_struct()?;
2291        let right = test_table_scan_with_struct_named("right")?;
2292
2293        let plan = LogicalPlanBuilder::from(left)
2294            .join_with_expr_keys(
2295                right,
2296                JoinType::Inner,
2297                (
2298                    vec![leaf_udf(col("user"), "id")],
2299                    vec![leaf_udf(col("user"), "id")],
2300                ),
2301                None,
2302            )?
2303            .filter(leaf_udf(col("test.user"), "status").eq(lit("active")))?
2304            .build()?;
2305
2306        assert_stages!(plan, @r#"
2307        ## Original Plan
2308        Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2309          Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2310            TableScan: test projection=[id, user]
2311            TableScan: right projection=[id, user]
2312
2313        ## After Extraction
2314        Projection: test.id, test.user, right.id, right.user
2315          Filter: __datafusion_extracted_1 = Utf8("active")
2316            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, right.id, right.user
2317              Projection: test.id, test.user, right.id, right.user
2318                Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2319                  Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user
2320                    TableScan: test projection=[id, user]
2321                  Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2322                    TableScan: right projection=[id, user]
2323
2324        ## After Pushdown
2325        Projection: test.id, test.user, right.id, right.user
2326          Filter: __datafusion_extracted_1 = Utf8("active")
2327            Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2328              Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2329                TableScan: test projection=[id, user]
2330              Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2331                TableScan: right projection=[id, user]
2332
2333        ## Optimized
2334        Projection: test.id, test.user, right.id, right.user
2335          Filter: __datafusion_extracted_1 = Utf8("active")
2336            Projection: test.id, test.user, __datafusion_extracted_1, right.id, right.user
2337              Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2338                Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2339                  TableScan: test projection=[id, user]
2340                Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2341                  TableScan: right projection=[id, user]
2342        "#)
2343    }
2344
2345    /// Extraction projection (get_field in SELECT) above a Join pushes into
2346    /// the correct input side.
2347    #[test]
2348    fn test_extract_projection_above_join() -> Result<()> {
2349        use datafusion_expr::JoinType;
2350
2351        let left = test_table_scan_with_struct()?;
2352        let right = test_table_scan_with_struct_named("right")?;
2353
2354        let plan = LogicalPlanBuilder::from(left)
2355            .join(right, JoinType::Inner, (vec!["id"], vec!["id"]), None)?
2356            .project(vec![
2357                leaf_udf(col("test.user"), "status"),
2358                leaf_udf(col("right.user"), "role"),
2359            ])?
2360            .build()?;
2361
2362        assert_stages!(plan, @r#"
2363        ## Original Plan
2364        Projection: leaf_udf(test.user, Utf8("status")), leaf_udf(right.user, Utf8("role"))
2365          Inner Join: test.id = right.id
2366            TableScan: test projection=[id, user]
2367            TableScan: right projection=[id, user]
2368
2369        ## After Extraction
2370        (same as original)
2371
2372        ## After Pushdown
2373        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2374          Inner Join: test.id = right.id
2375            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2376              TableScan: test projection=[id, user]
2377            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2378              TableScan: right projection=[id, user]
2379
2380        ## Optimized
2381        Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2382          Inner Join: test.id = right.id
2383            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
2384              TableScan: test projection=[id, user]
2385            Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id
2386              TableScan: right projection=[id, user]
2387        "#)
2388    }
2389
2390    /// Join where both sides have same-named columns: a qualified reference
2391    /// to the right side must be routed to the right input, not the left.
2392    #[test]
2393    fn test_extract_from_join_qualified_right_side() -> Result<()> {
2394        use datafusion_expr::JoinType;
2395
2396        let left = test_table_scan_with_struct()?;
2397        let right = test_table_scan_with_struct_named("right")?;
2398
2399        // Filter references right.user explicitly — must route to right side
2400        let plan = LogicalPlanBuilder::from(left)
2401            .join_on(
2402                right,
2403                JoinType::Inner,
2404                vec![
2405                    col("test.id").eq(col("right.id")),
2406                    leaf_udf(col("right.user"), "status").eq(lit("active")),
2407                ],
2408            )?
2409            .build()?;
2410
2411        assert_stages!(plan, @r#"
2412        ## Original Plan
2413        Inner Join:  Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) = Utf8("active")
2414          TableScan: test projection=[id, user]
2415          TableScan: right projection=[id, user]
2416
2417        ## After Extraction
2418        Projection: test.id, test.user, right.id, right.user
2419          Inner Join:  Filter: test.id = right.id AND __datafusion_extracted_1 = Utf8("active")
2420            TableScan: test projection=[id, user]
2421            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2422              TableScan: right projection=[id, user]
2423
2424        ## After Pushdown
2425        (same as after extraction)
2426
2427        ## Optimized
2428        (same as after pushdown)
2429        "#)
2430    }
2431
2432    /// When both inputs contain the same unqualified column, an unqualified
2433    /// column reference is ambiguous and `find_owning_input` must return
2434    /// `None` rather than always returning 0 (the left side).
2435    #[test]
2436    fn test_find_owning_input_ambiguous_unqualified_column() {
2437        use std::collections::HashSet;
2438
2439        // Simulate schema_columns output for two sides of a join where both
2440        // have a "user" column — each set contains the qualified and
2441        // unqualified form.
2442        let left_cols: HashSet<Column> = [
2443            Column::new(Some("test"), "user"),
2444            Column::new_unqualified("user"),
2445        ]
2446        .into_iter()
2447        .collect();
2448
2449        let right_cols: HashSet<Column> = [
2450            Column::new(Some("right"), "user"),
2451            Column::new_unqualified("user"),
2452        ]
2453        .into_iter()
2454        .collect();
2455
2456        let input_column_sets = vec![left_cols, right_cols];
2457
2458        // Unqualified "user" matches both sets — must return None (ambiguous)
2459        let unqualified = Expr::Column(Column::new_unqualified("user"));
2460        assert_eq!(find_owning_input(&unqualified, &input_column_sets), None);
2461
2462        // Qualified "right.user" matches only the right set — must return Some(1)
2463        let qualified_right = Expr::Column(Column::new(Some("right"), "user"));
2464        assert_eq!(
2465            find_owning_input(&qualified_right, &input_column_sets),
2466            Some(1)
2467        );
2468
2469        // Qualified "test.user" matches only the left set — must return Some(0)
2470        let qualified_left = Expr::Column(Column::new(Some("test"), "user"));
2471        assert_eq!(
2472            find_owning_input(&qualified_left, &input_column_sets),
2473            Some(0)
2474        );
2475    }
2476
2477    /// Two leaf_udf expressions from different sides of a Join in a Filter.
2478    /// Each is routed to its respective input side independently.
2479    #[test]
2480    fn test_extract_from_join_cross_input_expression() -> Result<()> {
2481        let left = test_table_scan_with_struct()?;
2482        let right = test_table_scan_with_struct_named("right")?;
2483
2484        let plan = LogicalPlanBuilder::from(left)
2485            .join_on(
2486                right,
2487                datafusion_expr::JoinType::Inner,
2488                vec![col("test.id").eq(col("right.id"))],
2489            )?
2490            .filter(
2491                leaf_udf(col("test.user"), "status")
2492                    .eq(leaf_udf(col("right.user"), "status")),
2493            )?
2494            .build()?;
2495
2496        assert_stages!(plan, @r#"
2497        ## Original Plan
2498        Filter: leaf_udf(test.user, Utf8("status")) = leaf_udf(right.user, Utf8("status"))
2499          Inner Join:  Filter: test.id = right.id
2500            TableScan: test projection=[id, user]
2501            TableScan: right projection=[id, user]
2502
2503        ## After Extraction
2504        Projection: test.id, test.user, right.id, right.user
2505          Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2506            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, right.id, right.user
2507              Inner Join:  Filter: test.id = right.id
2508                TableScan: test projection=[id, user]
2509                TableScan: right projection=[id, user]
2510
2511        ## After Pushdown
2512        Projection: test.id, test.user, right.id, right.user
2513          Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2514            Inner Join:  Filter: test.id = right.id
2515              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2516                TableScan: test projection=[id, user]
2517              Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, right.id, right.user
2518                TableScan: right projection=[id, user]
2519
2520        ## Optimized
2521        (same as after pushdown)
2522        "#)
2523    }
2524
2525    // =========================================================================
2526    // Column-rename through intermediate node tests
2527    // =========================================================================
2528
2529    /// Projection with leaf expr above Filter above renaming Projection.
2530    #[test]
2531    fn test_extract_through_filter_with_column_rename() -> Result<()> {
2532        let table_scan = test_table_scan_with_struct()?;
2533        let plan = LogicalPlanBuilder::from(table_scan)
2534            .project(vec![col("user").alias("x")])?
2535            .filter(col("x").is_not_null())?
2536            .project(vec![leaf_udf(col("x"), "a")])?
2537            .build()?;
2538
2539        assert_stages!(plan, @r#"
2540        ## Original Plan
2541        Projection: leaf_udf(x, Utf8("a"))
2542          Filter: x IS NOT NULL
2543            Projection: test.user AS x
2544              TableScan: test projection=[user]
2545
2546        ## After Extraction
2547        (same as original)
2548
2549        ## After Pushdown
2550        Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2551          Filter: x IS NOT NULL
2552            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2553              TableScan: test projection=[user]
2554
2555        ## Optimized
2556        Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2557          Filter: x IS NOT NULL
2558            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2559              TableScan: test projection=[user]
2560        "#)
2561    }
2562
2563    /// Same as above but with a partial extraction (leaf + arithmetic).
2564    #[test]
2565    fn test_extract_partial_through_filter_with_column_rename() -> Result<()> {
2566        let table_scan = test_table_scan_with_struct()?;
2567        let plan = LogicalPlanBuilder::from(table_scan)
2568            .project(vec![col("user").alias("x")])?
2569            .filter(col("x").is_not_null())?
2570            .project(vec![leaf_udf(col("x"), "a").is_not_null()])?
2571            .build()?;
2572
2573        assert_stages!(plan, @r#"
2574        ## Original Plan
2575        Projection: leaf_udf(x, Utf8("a")) IS NOT NULL
2576          Filter: x IS NOT NULL
2577            Projection: test.user AS x
2578              TableScan: test projection=[user]
2579
2580        ## After Extraction
2581        (same as original)
2582
2583        ## After Pushdown
2584        Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2585          Filter: x IS NOT NULL
2586            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2587              TableScan: test projection=[user]
2588
2589        ## Optimized
2590        Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2591          Filter: x IS NOT NULL
2592            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2593              TableScan: test projection=[user]
2594        "#)
2595    }
2596
2597    /// Tests merge_into_extracted_projection path through a renaming projection.
2598    #[test]
2599    fn test_extract_from_filter_above_renaming_projection() -> Result<()> {
2600        let table_scan = test_table_scan_with_struct()?;
2601        let plan = LogicalPlanBuilder::from(table_scan)
2602            .project(vec![col("user").alias("x")])?
2603            .filter(leaf_udf(col("x"), "a").eq(lit("active")))?
2604            .build()?;
2605
2606        assert_stages!(plan, @r#"
2607        ## Original Plan
2608        Filter: leaf_udf(x, Utf8("a")) = Utf8("active")
2609          Projection: test.user AS x
2610            TableScan: test projection=[user]
2611
2612        ## After Extraction
2613        Projection: x
2614          Filter: __datafusion_extracted_1 = Utf8("active")
2615            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2616              TableScan: test projection=[user]
2617
2618        ## After Pushdown
2619        (same as after extraction)
2620
2621        ## Optimized
2622        Projection: x
2623          Filter: __datafusion_extracted_1 = Utf8("active")
2624            Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2625              TableScan: test projection=[user]
2626        "#)
2627    }
2628
2629    // =========================================================================
2630    // SubqueryAlias extraction tests
2631    // =========================================================================
2632
2633    /// Extraction projection pushes through SubqueryAlias.
2634    #[test]
2635    fn test_extract_through_subquery_alias() -> Result<()> {
2636        let table_scan = test_table_scan_with_struct()?;
2637        let plan = LogicalPlanBuilder::from(table_scan)
2638            .alias("sub")?
2639            .project(vec![leaf_udf(col("sub.user"), "name")])?
2640            .build()?;
2641
2642        assert_stages!(plan, @r#"
2643        ## Original Plan
2644        Projection: leaf_udf(sub.user, Utf8("name"))
2645          SubqueryAlias: sub
2646            TableScan: test projection=[user]
2647
2648        ## After Extraction
2649        (same as original)
2650
2651        ## After Pushdown
2652        Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2653          SubqueryAlias: sub
2654            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2655              TableScan: test projection=[user]
2656
2657        ## Optimized
2658        Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2659          SubqueryAlias: sub
2660            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2661              TableScan: test projection=[user]
2662        "#)
2663    }
2664
2665    /// Extraction projection pushes through SubqueryAlias + Filter.
2666    #[test]
2667    fn test_extract_through_subquery_alias_with_filter() -> Result<()> {
2668        let table_scan = test_table_scan_with_struct()?;
2669        let plan = LogicalPlanBuilder::from(table_scan)
2670            .alias("sub")?
2671            .filter(leaf_udf(col("sub.user"), "status").eq(lit("active")))?
2672            .project(vec![leaf_udf(col("sub.user"), "name")])?
2673            .build()?;
2674
2675        assert_stages!(plan, @r#"
2676        ## Original Plan
2677        Projection: leaf_udf(sub.user, Utf8("name"))
2678          Filter: leaf_udf(sub.user, Utf8("status")) = Utf8("active")
2679            SubqueryAlias: sub
2680              TableScan: test projection=[user]
2681
2682        ## After Extraction
2683        Projection: leaf_udf(sub.user, Utf8("name"))
2684          Projection: sub.user
2685            Filter: __datafusion_extracted_1 = Utf8("active")
2686              Projection: leaf_udf(sub.user, Utf8("status")) AS __datafusion_extracted_1, sub.user
2687                SubqueryAlias: sub
2688                  TableScan: test projection=[user]
2689
2690        ## After Pushdown
2691        Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2692          Filter: __datafusion_extracted_1 = Utf8("active")
2693            SubqueryAlias: sub
2694              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.user
2695                TableScan: test projection=[user]
2696
2697        ## Optimized
2698        Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2699          Filter: __datafusion_extracted_1 = Utf8("active")
2700            SubqueryAlias: sub
2701              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2702                TableScan: test projection=[user]
2703        "#)
2704    }
2705
2706    /// Two layers of SubqueryAlias: extraction pushes through both.
2707    #[test]
2708    fn test_extract_through_nested_subquery_alias() -> Result<()> {
2709        let table_scan = test_table_scan_with_struct()?;
2710        let plan = LogicalPlanBuilder::from(table_scan)
2711            .alias("inner_sub")?
2712            .alias("outer_sub")?
2713            .project(vec![leaf_udf(col("outer_sub.user"), "name")])?
2714            .build()?;
2715
2716        assert_stages!(plan, @r#"
2717        ## Original Plan
2718        Projection: leaf_udf(outer_sub.user, Utf8("name"))
2719          SubqueryAlias: outer_sub
2720            SubqueryAlias: inner_sub
2721              TableScan: test projection=[user]
2722
2723        ## After Extraction
2724        (same as original)
2725
2726        ## After Pushdown
2727        Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2728          SubqueryAlias: outer_sub
2729            SubqueryAlias: inner_sub
2730              Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2731                TableScan: test projection=[user]
2732
2733        ## Optimized
2734        Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2735          SubqueryAlias: outer_sub
2736            SubqueryAlias: inner_sub
2737              Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2738                TableScan: test projection=[user]
2739        "#)
2740    }
2741
2742    /// Plain columns through SubqueryAlias -- no extraction needed.
2743    #[test]
2744    fn test_subquery_alias_no_extraction() -> Result<()> {
2745        let table_scan = test_table_scan()?;
2746        let plan = LogicalPlanBuilder::from(table_scan)
2747            .alias("sub")?
2748            .project(vec![col("sub.a"), col("sub.b")])?
2749            .build()?;
2750
2751        assert_stages!(plan, @"
2752        ## Original Plan
2753        SubqueryAlias: sub
2754          TableScan: test projection=[a, b]
2755
2756        ## After Extraction
2757        (same as original)
2758
2759        ## After Pushdown
2760        (same as after extraction)
2761
2762        ## Optimized
2763        (same as after pushdown)
2764        ")
2765    }
2766
2767    /// Two UDFs with the same `name()` but different concrete types should NOT be
2768    /// deduplicated -- they are semantically different expressions that happen to
2769    /// collide on `schema_name()`.
2770    #[test]
2771    fn test_different_udfs_same_schema_name_not_deduplicated() -> Result<()> {
2772        let udf_a = Arc::new(ScalarUDF::new_from_impl(
2773            PlacementTestUDF::new()
2774                .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2775                .with_id(1),
2776        ));
2777        let udf_b = Arc::new(ScalarUDF::new_from_impl(
2778            PlacementTestUDF::new()
2779                .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2780                .with_id(2),
2781        ));
2782
2783        let expr_a = Expr::ScalarFunction(ScalarFunction::new_udf(
2784            udf_a,
2785            vec![col("user"), lit("field")],
2786        ));
2787        let expr_b = Expr::ScalarFunction(ScalarFunction::new_udf(
2788            udf_b,
2789            vec![col("user"), lit("field")],
2790        ));
2791
2792        // Verify preconditions: same schema_name but different Expr
2793        assert_eq!(
2794            expr_a.schema_name().to_string(),
2795            expr_b.schema_name().to_string(),
2796            "Both expressions should have the same schema_name"
2797        );
2798        assert_ne!(
2799            expr_a, expr_b,
2800            "Expressions should NOT be equal (different UDF instances)"
2801        );
2802
2803        let table_scan = test_table_scan_with_struct()?;
2804        let plan = LogicalPlanBuilder::from(table_scan.clone())
2805            .filter(expr_a.clone().eq(lit("a")).and(expr_b.clone().eq(lit("b"))))?
2806            .select(vec![
2807                table_scan
2808                    .schema()
2809                    .index_of_column_by_name(None, "id")
2810                    .unwrap(),
2811            ])?
2812            .build()?;
2813
2814        assert_stages!(plan, @r#"
2815        ## Original Plan
2816        Projection: test.id
2817          Filter: leaf_udf(test.user, Utf8("field")) = Utf8("a") AND leaf_udf(test.user, Utf8("field")) = Utf8("b")
2818            TableScan: test projection=[id, user]
2819
2820        ## After Extraction
2821        Projection: test.id
2822          Projection: test.id, test.user
2823            Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2824              Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id, test.user
2825                TableScan: test projection=[id, user]
2826
2827        ## After Pushdown
2828        (same as after extraction)
2829
2830        ## Optimized
2831        Projection: test.id
2832          Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2833            Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id
2834              TableScan: test projection=[id, user]
2835        "#)
2836    }
2837
2838    // =========================================================================
2839    // Filter pushdown interaction tests
2840    // =========================================================================
2841
2842    /// Extraction pushdown through a filter that already had its own
2843    /// `leaf_udf` extracted.
2844    #[test]
2845    fn test_extraction_pushdown_through_filter_with_extracted_predicate() -> Result<()> {
2846        let table_scan = test_table_scan_with_struct()?;
2847        let plan = LogicalPlanBuilder::from(table_scan)
2848            .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2849            .project(vec![col("id"), leaf_udf(col("user"), "name")])?
2850            .build()?;
2851
2852        assert_stages!(plan, @r#"
2853        ## Original Plan
2854        Projection: test.id, leaf_udf(test.user, Utf8("name"))
2855          Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2856            TableScan: test projection=[id, user]
2857
2858        ## After Extraction
2859        Projection: test.id, leaf_udf(test.user, Utf8("name"))
2860          Projection: test.id, test.user
2861            Filter: __datafusion_extracted_1 = Utf8("active")
2862              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2863                TableScan: test projection=[id, user]
2864
2865        ## After Pushdown
2866        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2867          Filter: __datafusion_extracted_1 = Utf8("active")
2868            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2869              TableScan: test projection=[id, user]
2870
2871        ## Optimized
2872        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2873          Filter: __datafusion_extracted_1 = Utf8("active")
2874            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2875              TableScan: test projection=[id, user]
2876        "#)
2877    }
2878
2879    /// Same expression in filter predicate and projection output.
2880    #[test]
2881    fn test_extraction_pushdown_same_expr_in_filter_and_projection() -> Result<()> {
2882        let table_scan = test_table_scan_with_struct()?;
2883        let field_expr = leaf_udf(col("user"), "status");
2884        let plan = LogicalPlanBuilder::from(table_scan)
2885            .filter(field_expr.clone().gt(lit(5)))?
2886            .project(vec![col("id"), field_expr])?
2887            .build()?;
2888
2889        assert_stages!(plan, @r#"
2890        ## Original Plan
2891        Projection: test.id, leaf_udf(test.user, Utf8("status"))
2892          Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2893            TableScan: test projection=[id, user]
2894
2895        ## After Extraction
2896        Projection: test.id, leaf_udf(test.user, Utf8("status"))
2897          Projection: test.id, test.user
2898            Filter: __datafusion_extracted_1 > Int32(5)
2899              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2900                TableScan: test projection=[id, user]
2901
2902        ## After Pushdown
2903        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2904          Filter: __datafusion_extracted_1 > Int32(5)
2905            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2906              TableScan: test projection=[id, user]
2907
2908        ## Optimized
2909        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2910          Filter: __datafusion_extracted_1 > Int32(5)
2911            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2912              TableScan: test projection=[id, user]
2913        "#)
2914    }
2915
2916    /// Left join with a `leaf_udf` filter on the right side AND
2917    /// the projection also selects `leaf_udf` from the right side.
2918    #[test]
2919    fn test_left_join_with_filter_and_projection_extraction() -> Result<()> {
2920        use datafusion_expr::JoinType;
2921
2922        let left = test_table_scan_with_struct()?;
2923        let right = test_table_scan_with_struct_named("right")?;
2924
2925        let plan = LogicalPlanBuilder::from(left)
2926            .join_on(
2927                right,
2928                JoinType::Left,
2929                vec![
2930                    col("test.id").eq(col("right.id")),
2931                    leaf_udf(col("right.user"), "status").gt(lit(5)),
2932                ],
2933            )?
2934            .project(vec![
2935                col("test.id"),
2936                leaf_udf(col("test.user"), "name"),
2937                leaf_udf(col("right.user"), "status"),
2938            ])?
2939            .build()?;
2940
2941        assert_stages!(plan, @r#"
2942        ## Original Plan
2943        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2944          Left Join:  Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) > Int32(5)
2945            TableScan: test projection=[id, user]
2946            TableScan: right projection=[id, user]
2947
2948        ## After Extraction
2949        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2950          Projection: test.id, test.user, right.id, right.user
2951            Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2952              TableScan: test projection=[id, user]
2953              Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2954                TableScan: right projection=[id, user]
2955
2956        ## After Pushdown
2957        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2958          Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2959            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id, test.user
2960              TableScan: test projection=[id, user]
2961            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2962              TableScan: right projection=[id, user]
2963
2964        ## Optimized
2965        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2966          Left Join:  Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2967            Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id
2968              TableScan: test projection=[id, user]
2969            Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2970              TableScan: right projection=[id, user]
2971        "#)
2972    }
2973
2974    /// Extraction projection pushed through a filter whose predicate
2975    /// references a different extracted expression.
2976    #[test]
2977    fn test_pure_extraction_proj_push_through_filter() -> Result<()> {
2978        let table_scan = test_table_scan_with_struct()?;
2979        let plan = LogicalPlanBuilder::from(table_scan)
2980            .filter(leaf_udf(col("user"), "status").gt(lit(5)))?
2981            .project(vec![
2982                col("id"),
2983                leaf_udf(col("user"), "name"),
2984                leaf_udf(col("user"), "status"),
2985            ])?
2986            .build()?;
2987
2988        assert_stages!(plan, @r#"
2989        ## Original Plan
2990        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2991          Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2992            TableScan: test projection=[id, user]
2993
2994        ## After Extraction
2995        Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2996          Projection: test.id, test.user
2997            Filter: __datafusion_extracted_1 > Int32(5)
2998              Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2999                TableScan: test projection=[id, user]
3000
3001        ## After Pushdown
3002        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
3003          Filter: __datafusion_extracted_1 > Int32(5)
3004            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
3005              TableScan: test projection=[id, user]
3006
3007        ## Optimized
3008        Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
3009          Filter: __datafusion_extracted_1 > Int32(5)
3010            Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
3011              TableScan: test projection=[id, user]
3012        "#)
3013    }
3014
3015    /// When an extraction projection's __extracted alias references a column
3016    /// (e.g. `user`) that is NOT a standalone expression in the projection,
3017    /// the merge into the inner projection should still succeed.
3018    #[test]
3019    fn test_merge_extraction_into_projection_with_column_ref_inflation() -> Result<()> {
3020        let table_scan = test_table_scan_with_struct()?;
3021
3022        // Inner projection (simulates a trimmed projection)
3023        let inner = LogicalPlanBuilder::from(table_scan)
3024            .project(vec![col("user"), col("id")])?
3025            .build()?;
3026
3027        // Outer projection: __extracted alias + id (but NOT user as standalone).
3028        // The alias references `user` internally, inflating columns_needed.
3029        let plan = LogicalPlanBuilder::from(inner)
3030            .project(vec![
3031                leaf_udf(col("user"), "status")
3032                    .alias(format!("{EXTRACTED_EXPR_PREFIX}_1")),
3033                col("id"),
3034            ])?
3035            .build()?;
3036
3037        // Run only PushDownLeafProjections
3038        let ctx = OptimizerContext::new().with_max_passes(1);
3039        let optimizer =
3040            Optimizer::with_rules(vec![Arc::new(PushDownLeafProjections::new())]);
3041        let result = optimizer.optimize(plan, &ctx, |_, _| {})?;
3042
3043        // With the fix: merge succeeds → extraction merged into inner projection.
3044        // Without the fix: merge rejected → two separate projections remain.
3045        insta::assert_snapshot!(format!("{result}"), @r#"
3046        Projection: __datafusion_extracted_1, test.id
3047          Projection: test.user, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
3048            TableScan: test
3049        "#);
3050
3051        Ok(())
3052    }
3053}