Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Optional reference node name (e.g., for CTEs)
33    pub reference_node_name: String,
34}
35
36impl LineageNode {
37    /// Create a new lineage node
38    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39        Self {
40            name: name.into(),
41            expression,
42            source,
43            downstream: Vec::new(),
44            source_name: String::new(),
45            reference_node_name: String::new(),
46        }
47    }
48
49    /// Iterate over all nodes in the lineage graph using DFS
50    pub fn walk(&self) -> LineageWalker<'_> {
51        LineageWalker { stack: vec![self] }
52    }
53
54    /// Get all downstream column names
55    pub fn downstream_names(&self) -> Vec<String> {
56        self.downstream.iter().map(|n| n.name.clone()).collect()
57    }
58}
59
60/// Iterator for walking the lineage graph
61pub struct LineageWalker<'a> {
62    stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66    type Item = &'a LineageNode;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(node) = self.stack.pop() {
70            // Add children in reverse order so they're visited in order
71            for child in node.downstream.iter().rev() {
72                self.stack.push(child);
73            }
74            Some(node)
75        } else {
76            None
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// ColumnRef: name or positional index for column lookup
83// ---------------------------------------------------------------------------
84
85/// Column reference for lineage tracing — by name or positional index.
86enum ColumnRef<'a> {
87    Name(&'a str),
88    Index(usize),
89}
90
91// ---------------------------------------------------------------------------
92// Public API
93// ---------------------------------------------------------------------------
94
95/// Build the lineage graph for a column in a SQL query
96///
97/// # Arguments
98/// * `column` - The column name to trace lineage for
99/// * `sql` - The SQL expression (SELECT, UNION, etc.)
100/// * `dialect` - Optional dialect for parsing
101/// * `trim_selects` - If true, trim the source SELECT to only include the target column
102///
103/// # Returns
104/// The root lineage node for the specified column
105///
106/// # Example
107/// ```ignore
108/// use polyglot_sql::lineage::lineage;
109/// use polyglot_sql::parse_one;
110/// use polyglot_sql::DialectType;
111///
112/// let sql = "SELECT a, b + 1 AS c FROM t";
113/// let expr = parse_one(sql, DialectType::Generic).unwrap();
114/// let node = lineage("c", &expr, None, false).unwrap();
115/// ```
116pub fn lineage(
117    column: &str,
118    sql: &Expression,
119    dialect: Option<DialectType>,
120    trim_selects: bool,
121) -> Result<LineageNode> {
122    // Fast path: skip clone when there are no CTEs to expand
123    let effective_sql = lineage_effective_expression(sql);
124    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
125    if !has_with {
126        return lineage_from_expression(column, sql, dialect, trim_selects);
127    }
128    let mut owned = sql.clone();
129    expand_cte_stars(&mut owned, None);
130    lineage_from_expression(column, &owned, dialect, trim_selects)
131}
132
133/// Build the lineage graph for a column in a SQL query using optional schema metadata.
134///
135/// When `schema` is provided, the query is first qualified with
136/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
137/// ambiguous column references.
138///
139/// # Arguments
140/// * `column` - The column name to trace lineage for
141/// * `sql` - The SQL expression (SELECT, UNION, etc.)
142/// * `schema` - Optional schema used for qualification
143/// * `dialect` - Optional dialect for qualification and lineage handling
144/// * `trim_selects` - If true, trim the source SELECT to only include the target column
145///
146/// # Returns
147/// The root lineage node for the specified column
148pub fn lineage_with_schema(
149    column: &str,
150    sql: &Expression,
151    schema: Option<&dyn Schema>,
152    dialect: Option<DialectType>,
153    trim_selects: bool,
154) -> Result<LineageNode> {
155    let mut qualified_expression = if let Some(schema) = schema {
156        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
157            QualifyColumnsOptions::new().with_dialect(dialect_type)
158        } else {
159            QualifyColumnsOptions::new()
160        };
161
162        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
163            Error::internal(format!("Lineage qualification failed with schema: {}", e))
164        })?
165    } else {
166        sql.clone()
167    };
168
169    // Annotate types in-place so lineage nodes carry type information
170    annotate_types(&mut qualified_expression, schema, dialect);
171
172    // Expand CTE stars on the already-owned expression (no extra clone).
173    // Pass schema so that stars from external tables can also be resolved.
174    expand_cte_stars(&mut qualified_expression, schema);
175
176    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
177}
178
179fn lineage_from_expression(
180    column: &str,
181    sql: &Expression,
182    dialect: Option<DialectType>,
183    trim_selects: bool,
184) -> Result<LineageNode> {
185    let sql = lineage_effective_expression(sql);
186    let scope = build_scope(sql);
187    to_node(
188        ColumnRef::Name(column),
189        &scope,
190        dialect,
191        "",
192        "",
193        "",
194        trim_selects,
195    )
196}
197
198fn lineage_effective_expression(sql: &Expression) -> &Expression {
199    match sql {
200        Expression::Prepare(prepare) => &prepare.statement,
201        _ => sql,
202    }
203}
204
205// ---------------------------------------------------------------------------
206// CTE star expansion
207// ---------------------------------------------------------------------------
208
209/// Normalize an identifier for CTE name matching.
210///
211/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
212/// quoted identifiers preserve their original case. This matches sqlglot's
213/// `normalize_identifiers` behavior.
214fn normalize_cte_name(ident: &Identifier) -> String {
215    if ident.quoted {
216        ident.name.clone()
217    } else {
218        ident.name.to_lowercase()
219    }
220}
221
222/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
223/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
224/// which qualify_columns cannot resolve because it processes each SELECT independently.
225///
226/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
227/// by looking up column names in the schema. This enables correct expansion of patterns
228/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
229///
230/// CTE name matching follows SQL identifier semantics: unquoted names are compared
231/// case-insensitively (lowercased), while quoted names preserve their original case.
232/// This matches sqlglot's `normalize_identifiers` behavior.
233pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
234    if let Expression::Prepare(prepare) = expr {
235        expand_cte_stars(&mut prepare.statement, schema);
236        return;
237    }
238
239    let select = match expr {
240        Expression::Select(s) => s,
241        _ => return,
242    };
243
244    let with = match &mut select.with {
245        Some(w) => w,
246        None => return,
247    };
248
249    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
250
251    for cte in &mut with.ctes {
252        let cte_name = normalize_cte_name(&cte.alias);
253
254        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
255        if !cte.columns.is_empty() {
256            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
257            resolved_cte_columns.insert(cte_name, cols);
258            continue;
259        }
260
261        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
262        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
263        // references itself. We detect this conservatively: if the CTE name appears as
264        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
265        // block are still expanded.
266        if with.recursive {
267            let is_self_referencing =
268                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
269                    let body_sources = get_select_sources(body_select);
270                    body_sources.iter().any(|s| s.normalized == cte_name)
271                } else {
272                    false
273                };
274            if is_self_referencing {
275                continue;
276            }
277        }
278
279        // Get the SELECT from the CTE body (handle UNION by taking left branch)
280        let body_select = match get_leftmost_select_mut(&mut cte.this) {
281            Some(s) => s,
282            None => continue,
283        };
284
285        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
286        resolved_cte_columns.insert(cte_name, columns);
287    }
288
289    // Also expand stars in the outer SELECT itself
290    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
291}
292
293/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
294///
295/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
296/// are determined by the left branch. This matches sqlglot's behavior.
297fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
298    let mut current = expr;
299    for _ in 0..MAX_LINEAGE_DEPTH {
300        match current {
301            Expression::Select(s) => return Some(s),
302            Expression::Union(u) => current = &mut u.left,
303            Expression::Intersect(i) => current = &mut i.left,
304            Expression::Except(e) => current = &mut e.left,
305            Expression::Paren(p) => current = &mut p.this,
306            _ => return None,
307        }
308    }
309    None
310}
311
312/// Rewrite star expressions in a SELECT using resolved CTE column lists.
313/// Falls back to `schema` for external table column lookup.
314/// Returns the list of output column names after expansion.
315fn rewrite_stars_in_select(
316    select: &mut Select,
317    resolved_ctes: &HashMap<String, Vec<String>>,
318    schema: Option<&dyn Schema>,
319) -> Vec<String> {
320    // The AST represents star expressions in two forms depending on syntax:
321    //   - `SELECT *`      → Expression::Star (unqualified star)
322    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
323    // Both must be checked to handle all star patterns.
324    let has_star = select
325        .expressions
326        .iter()
327        .any(|e| matches!(e, Expression::Star(_)));
328    let has_qualified_star = select
329        .expressions
330        .iter()
331        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
332
333    if !has_star && !has_qualified_star {
334        // No stars — just extract column names without rewriting
335        return select
336            .expressions
337            .iter()
338            .filter_map(get_expression_output_name)
339            .collect();
340    }
341
342    let sources = get_select_sources(select);
343    let mut new_expressions = Vec::new();
344    let mut result_columns = Vec::new();
345
346    for expr in &select.expressions {
347        match expr {
348            Expression::Star(star) => {
349                let qual = star.table.as_ref();
350                if let Some(expanded) =
351                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
352                {
353                    for (src_alias, col_name) in &expanded {
354                        let table_id = Identifier::new(src_alias);
355                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
356                        result_columns.push(col_name.clone());
357                    }
358                } else {
359                    new_expressions.push(expr.clone());
360                    result_columns.push("*".to_string());
361                }
362            }
363            Expression::Column(c) if c.name.name == "*" => {
364                let qual = c.table.as_ref();
365                if let Some(expanded) =
366                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
367                {
368                    for (_src_alias, col_name) in &expanded {
369                        // Keep the original table qualifier for qualified stars (table.*)
370                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
371                        result_columns.push(col_name.clone());
372                    }
373                } else {
374                    new_expressions.push(expr.clone());
375                    result_columns.push("*".to_string());
376                }
377            }
378            _ => {
379                new_expressions.push(expr.clone());
380                if let Some(name) = get_expression_output_name(expr) {
381                    result_columns.push(name);
382                }
383            }
384        }
385    }
386
387    select.expressions = new_expressions;
388    result_columns
389}
390
391/// Try to expand a star expression by looking up source columns from resolved CTEs,
392/// falling back to the schema for external tables.
393/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
394/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
395fn expand_star_from_sources(
396    qualifier: Option<&Identifier>,
397    sources: &[SourceInfo],
398    resolved_ctes: &HashMap<String, Vec<String>>,
399    schema: Option<&dyn Schema>,
400) -> Option<Vec<(String, String)>> {
401    let mut expanded = Vec::new();
402
403    if let Some(qual) = qualifier {
404        // Qualified star: table.*
405        let qual_normalized = normalize_cte_name(qual);
406        for src in sources {
407            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
408                // Try CTE first
409                if let Some(cols) = resolved_ctes.get(&src.normalized) {
410                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
411                    return Some(expanded);
412                }
413                // Fall back to schema
414                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
415                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
416                    return Some(expanded);
417                }
418            }
419        }
420        None
421    } else {
422        // Unqualified star: expand all sources.
423        // Intentionally conservative: if any source can't be resolved, the entire
424        // expansion is aborted. Partial expansion would produce an incomplete column
425        // list, causing downstream lineage resolution to silently omit columns.
426        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
427        let mut any_expanded = false;
428        for src in sources {
429            if let Some(cols) = resolved_ctes.get(&src.normalized) {
430                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
431                any_expanded = true;
432            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
433                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
434                any_expanded = true;
435            } else {
436                return None;
437            }
438        }
439        if any_expanded {
440            Some(expanded)
441        } else {
442            None
443        }
444    }
445}
446
447/// Look up column names for a table from the schema.
448fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
449    let schema = schema?;
450    if fq_name.is_empty() {
451        return None;
452    }
453    schema
454        .column_names(fq_name)
455        .ok()
456        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
457}
458
459/// Create a Column expression with the given name and optional table qualifier.
460fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
461    Expression::Column(Box::new(crate::expressions::Column {
462        name: Identifier::new(name),
463        table: table.cloned(),
464        join_mark: false,
465        trailing_comments: Vec::new(),
466        span: None,
467        inferred_type: None,
468    }))
469}
470
471/// Extract the output name of a SELECT expression.
472fn get_expression_output_name(expr: &Expression) -> Option<String> {
473    match expr {
474        Expression::Alias(a) => Some(a.alias.name.clone()),
475        Expression::Column(c) => Some(c.name.name.clone()),
476        Expression::Identifier(id) => Some(id.name.clone()),
477        Expression::Star(_) => Some("*".to_string()),
478        _ => None,
479    }
480}
481
482/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
483struct SourceInfo {
484    alias: String,
485    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
486    normalized: String,
487    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
488    fq_name: String,
489}
490
491/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
492/// SELECT's FROM and JOIN clauses in a single pass.
493fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
494    let mut sources = Vec::new();
495
496    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
497        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
498            SourceInfo {
499                alias: alias.name.clone(),
500                normalized: normalize_cte_name(alias),
501                fq_name: alias.name.clone(),
502            }
503        }
504
505        match expr {
506            Expression::Table(t) => {
507                let normalized = normalize_cte_name(&t.name);
508                let alias = t
509                    .alias
510                    .as_ref()
511                    .map(|a| a.name.clone())
512                    .unwrap_or_else(|| t.name.name.clone());
513                let mut parts = Vec::new();
514                if let Some(catalog) = &t.catalog {
515                    parts.push(catalog.name.clone());
516                }
517                if let Some(schema) = &t.schema {
518                    parts.push(schema.name.clone());
519                }
520                parts.push(t.name.name.clone());
521                let fq_name = parts.join(".");
522                Some(SourceInfo {
523                    alias,
524                    normalized,
525                    fq_name,
526                })
527            }
528            Expression::Subquery(s) => {
529                let alias = s.alias.as_ref()?.name.clone();
530                let normalized = alias.to_lowercase();
531                let fq_name = alias.clone();
532                Some(SourceInfo {
533                    alias,
534                    normalized,
535                    fq_name,
536                })
537            }
538            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
539            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
540                Some(virtual_source_info(&a.alias))
541            }
542            Expression::Paren(p) => extract_source(&p.this),
543            _ => None,
544        }
545    }
546
547    if let Some(from) = &select.from {
548        for expr in &from.expressions {
549            if let Some(info) = extract_source(expr) {
550                sources.push(info);
551            }
552        }
553    }
554    for join in &select.joins {
555        if let Some(info) = extract_source(&join.this) {
556            sources.push(info);
557        }
558    }
559    sources
560}
561
562/// Get all source tables from a lineage graph
563pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
564    let mut tables = HashSet::new();
565    collect_source_tables(node, &mut tables);
566    tables
567}
568
569/// Recursively collect source table names from lineage graph
570pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
571    if let Expression::Table(table) = &node.source {
572        tables.insert(table.name.name.clone());
573    }
574    for child in &node.downstream {
575        collect_source_tables(child, tables);
576    }
577}
578
579// ---------------------------------------------------------------------------
580// Core recursive lineage builder
581// ---------------------------------------------------------------------------
582
583/// Maximum recursion depth for lineage tracing to prevent stack overflow
584/// on circular or deeply nested CTE chains.
585const MAX_LINEAGE_DEPTH: usize = 64;
586
587/// Recursively build a lineage node for a column in a scope.
588fn to_node(
589    column: ColumnRef<'_>,
590    scope: &Scope,
591    dialect: Option<DialectType>,
592    scope_name: &str,
593    source_name: &str,
594    reference_node_name: &str,
595    trim_selects: bool,
596) -> Result<LineageNode> {
597    to_node_inner(
598        column,
599        scope,
600        dialect,
601        scope_name,
602        source_name,
603        reference_node_name,
604        trim_selects,
605        &[],
606        0,
607    )
608}
609
610fn to_node_inner(
611    column: ColumnRef<'_>,
612    scope: &Scope,
613    dialect: Option<DialectType>,
614    scope_name: &str,
615    source_name: &str,
616    reference_node_name: &str,
617    trim_selects: bool,
618    ancestor_cte_scopes: &[Scope],
619    depth: usize,
620) -> Result<LineageNode> {
621    if depth > MAX_LINEAGE_DEPTH {
622        return Err(Error::internal(format!(
623            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
624        )));
625    }
626    let scope_expr = &scope.expression;
627
628    // Build combined CTE scopes: current scope's cte_scopes + ancestors
629    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
630    for s in ancestor_cte_scopes {
631        all_cte_scopes.push(s);
632    }
633
634    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
635    //    but we need the inner query (SELECT/UNION) for column lookup.
636    let effective_expr = match scope_expr {
637        Expression::Cte(cte) => &cte.this,
638        other => other,
639    };
640
641    // 1. Set operations (UNION / INTERSECT / EXCEPT)
642    if matches!(
643        effective_expr,
644        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
645    ) {
646        // For CTE wrapping a set op, create a temporary scope with the inner expression
647        if matches!(scope_expr, Expression::Cte(_)) {
648            let mut inner_scope = Scope::new(effective_expr.clone());
649            inner_scope.union_scopes = scope.union_scopes.clone();
650            inner_scope.sources = scope.sources.clone();
651            inner_scope.cte_sources = scope.cte_sources.clone();
652            inner_scope.cte_scopes = scope.cte_scopes.clone();
653            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
654            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
655            return handle_set_operation(
656                &column,
657                &inner_scope,
658                dialect,
659                scope_name,
660                source_name,
661                reference_node_name,
662                trim_selects,
663                ancestor_cte_scopes,
664                depth,
665            );
666        }
667        return handle_set_operation(
668            &column,
669            scope,
670            dialect,
671            scope_name,
672            source_name,
673            reference_node_name,
674            trim_selects,
675            ancestor_cte_scopes,
676            depth,
677        );
678    }
679
680    // 2. Find the select expression for this column
681    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
682    let column_name = resolve_column_name(&column, &select_expr);
683
684    // 3. Trim source if requested
685    let node_source = if trim_selects {
686        trim_source(effective_expr, &select_expr)
687    } else {
688        effective_expr.clone()
689    };
690
691    // 4. Create the lineage node
692    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
693    node.source_name = source_name.to_string();
694    node.reference_node_name = reference_node_name.to_string();
695
696    // 5. Star handling — add downstream for each source
697    if matches!(&select_expr, Expression::Star(_)) {
698        for (name, source_info) in &scope.sources {
699            let child = LineageNode::new(
700                format!("{}.*", name),
701                Expression::Star(crate::expressions::Star {
702                    table: None,
703                    except: None,
704                    replace: None,
705                    rename: None,
706                    trailing_comments: vec![],
707                    span: None,
708                }),
709                source_info.expression.clone(),
710            );
711            node.downstream.push(child);
712        }
713        return Ok(node);
714    }
715
716    // 6. Subqueries in select — trace through scalar subqueries
717    let subqueries: Vec<&Expression> =
718        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
719    for sq_expr in subqueries {
720        if let Expression::Subquery(sq) = sq_expr {
721            for sq_scope in &scope.subquery_scopes {
722                if sq_scope.expression == sq.this {
723                    if let Ok(child) = to_node_inner(
724                        ColumnRef::Index(0),
725                        sq_scope,
726                        dialect,
727                        &column_name,
728                        "",
729                        "",
730                        trim_selects,
731                        ancestor_cte_scopes,
732                        depth + 1,
733                    ) {
734                        node.downstream.push(child);
735                    }
736                    break;
737                }
738            }
739        }
740    }
741
742    // 7. Column references — trace each column to its source
743    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
744    for col_ref in col_refs {
745        let col_name = &col_ref.column;
746        if let Some(ref table_id) = col_ref.table {
747            let tbl = &table_id.name;
748            resolve_qualified_column(
749                &mut node,
750                scope,
751                dialect,
752                tbl,
753                col_name,
754                &column_name,
755                trim_selects,
756                &all_cte_scopes,
757                depth,
758            );
759        } else {
760            resolve_unqualified_column(
761                &mut node,
762                scope,
763                dialect,
764                col_name,
765                &column_name,
766                trim_selects,
767                &all_cte_scopes,
768                depth,
769            );
770        }
771    }
772
773    Ok(node)
774}
775
776// ---------------------------------------------------------------------------
777// Set operation handling
778// ---------------------------------------------------------------------------
779
780fn handle_set_operation(
781    column: &ColumnRef<'_>,
782    scope: &Scope,
783    dialect: Option<DialectType>,
784    scope_name: &str,
785    source_name: &str,
786    reference_node_name: &str,
787    trim_selects: bool,
788    ancestor_cte_scopes: &[Scope],
789    depth: usize,
790) -> Result<LineageNode> {
791    let scope_expr = &scope.expression;
792
793    // Determine column index
794    let col_index = match column {
795        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
796        ColumnRef::Index(i) => *i,
797    };
798
799    let col_name = match column {
800        ColumnRef::Name(name) => name.to_string(),
801        ColumnRef::Index(_) => format!("_{col_index}"),
802    };
803
804    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
805    node.source_name = source_name.to_string();
806    node.reference_node_name = reference_node_name.to_string();
807
808    // Recurse into each union branch
809    for branch_scope in &scope.union_scopes {
810        if let Ok(child) = to_node_inner(
811            ColumnRef::Index(col_index),
812            branch_scope,
813            dialect,
814            scope_name,
815            "",
816            "",
817            trim_selects,
818            ancestor_cte_scopes,
819            depth + 1,
820        ) {
821            node.downstream.push(child);
822        }
823    }
824
825    Ok(node)
826}
827
828// ---------------------------------------------------------------------------
829// Column resolution helpers
830// ---------------------------------------------------------------------------
831
832fn resolve_qualified_column(
833    node: &mut LineageNode,
834    scope: &Scope,
835    dialect: Option<DialectType>,
836    table: &str,
837    col_name: &str,
838    parent_name: &str,
839    trim_selects: bool,
840    all_cte_scopes: &[&Scope],
841    depth: usize,
842) {
843    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
844    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
845    let resolved_cte_name = resolve_cte_alias(scope, table);
846    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
847
848    // Check if table is a CTE reference — check both the current scope's cte_sources
849    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
850    let is_cte = scope.cte_sources.contains_key(effective_table)
851        || all_cte_scopes.iter().any(
852            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
853        );
854    if is_cte {
855        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
856            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
857            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
858            if let Ok(child) = to_node_inner(
859                ColumnRef::Name(col_name),
860                child_scope,
861                dialect,
862                parent_name,
863                effective_table,
864                parent_name,
865                trim_selects,
866                &ancestors,
867                depth + 1,
868            ) {
869                node.downstream.push(child);
870                return;
871            }
872        }
873    }
874
875    // Check if table is a derived table (is_scope = true in sources)
876    if let Some(source_info) = scope.sources.get(table) {
877        if source_info.is_scope {
878            if let Some(child_scope) = find_child_scope(scope, table) {
879                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
880                if let Ok(child) = to_node_inner(
881                    ColumnRef::Name(col_name),
882                    child_scope,
883                    dialect,
884                    parent_name,
885                    table,
886                    parent_name,
887                    trim_selects,
888                    &ancestors,
889                    depth + 1,
890                ) {
891                    node.downstream.push(child);
892                    return;
893                }
894            }
895        }
896    }
897
898    // Base table source found in current scope: preserve alias in the display name
899    // but store the resolved table expression and name for downstream consumers.
900    if let Some(source_info) = scope.sources.get(table) {
901        if !source_info.is_scope {
902            node.downstream.push(make_table_column_node_from_source(
903                table,
904                col_name,
905                &source_info.expression,
906            ));
907            return;
908        }
909    }
910
911    // Base table or unresolved — terminal node
912    node.downstream
913        .push(make_table_column_node(table, col_name));
914}
915
916/// Resolve a FROM alias to the original CTE name.
917///
918/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
919/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
920/// This function checks if `name` is such an alias and returns the CTE name.
921fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
922    // If it's already a known CTE name, no resolution needed
923    if scope.cte_sources.contains_key(name) {
924        return None;
925    }
926    // Check if the source's expression is a CTE — if so, extract the CTE name
927    if let Some(source_info) = scope.sources.get(name) {
928        if source_info.is_scope {
929            if let Expression::Cte(cte) = &source_info.expression {
930                let cte_name = &cte.alias.name;
931                if scope.cte_sources.contains_key(cte_name) {
932                    return Some(cte_name.clone());
933                }
934            }
935        }
936    }
937    None
938}
939
940fn resolve_unqualified_column(
941    node: &mut LineageNode,
942    scope: &Scope,
943    dialect: Option<DialectType>,
944    col_name: &str,
945    parent_name: &str,
946    trim_selects: bool,
947    all_cte_scopes: &[&Scope],
948    depth: usize,
949) {
950    // Try to find which source this column belongs to.
951    // Build the source list from the actual FROM/JOIN clauses to avoid
952    // mixing in CTE definitions that are in scope but not referenced.
953    let from_source_names = source_names_from_from_join(scope);
954
955    if from_source_names.len() == 1 {
956        let tbl = &from_source_names[0];
957        resolve_qualified_column(
958            node,
959            scope,
960            dialect,
961            tbl,
962            col_name,
963            parent_name,
964            trim_selects,
965            all_cte_scopes,
966            depth,
967        );
968        return;
969    }
970
971    // Multiple sources — can't resolve without schema info, add unqualified node
972    let child = LineageNode::new(
973        col_name.to_string(),
974        Expression::Column(Box::new(crate::expressions::Column {
975            name: crate::expressions::Identifier::new(col_name.to_string()),
976            table: None,
977            join_mark: false,
978            trailing_comments: vec![],
979            span: None,
980            inferred_type: None,
981        })),
982        node.source.clone(),
983    );
984    node.downstream.push(child);
985}
986
987fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
988    fn source_name(expr: &Expression) -> Option<String> {
989        match expr {
990            Expression::Table(table) => Some(
991                table
992                    .alias
993                    .as_ref()
994                    .map(|a| a.name.clone())
995                    .unwrap_or_else(|| table.name.name.clone()),
996            ),
997            Expression::Subquery(subquery) => {
998                subquery.alias.as_ref().map(|alias| alias.name.clone())
999            }
1000            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1001            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1002                Some(alias.alias.name.clone())
1003            }
1004            Expression::Paren(paren) => source_name(&paren.this),
1005            _ => None,
1006        }
1007    }
1008
1009    let effective_expr = match &scope.expression {
1010        Expression::Cte(cte) => &cte.this,
1011        expr => expr,
1012    };
1013
1014    let mut names = Vec::new();
1015    let mut seen = std::collections::HashSet::new();
1016
1017    if let Expression::Select(select) = effective_expr {
1018        if let Some(from) = &select.from {
1019            for expr in &from.expressions {
1020                if let Some(name) = source_name(expr) {
1021                    if !name.is_empty() && seen.insert(name.clone()) {
1022                        names.push(name);
1023                    }
1024                }
1025            }
1026        }
1027        for join in &select.joins {
1028            if let Some(name) = source_name(&join.this) {
1029                if !name.is_empty() && seen.insert(name.clone()) {
1030                    names.push(name);
1031                }
1032            }
1033        }
1034    }
1035
1036    names
1037}
1038
1039// ---------------------------------------------------------------------------
1040// Helper functions
1041// ---------------------------------------------------------------------------
1042
1043/// Get the alias or name of an expression
1044fn get_alias_or_name(expr: &Expression) -> Option<String> {
1045    match expr {
1046        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1047        Expression::Column(col) => Some(col.name.name.clone()),
1048        Expression::Identifier(id) => Some(id.name.clone()),
1049        Expression::Star(_) => Some("*".to_string()),
1050        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1051        // Unwrap to get the actual column/alias name from the inner expression.
1052        Expression::Annotated(a) => get_alias_or_name(&a.this),
1053        _ => None,
1054    }
1055}
1056
1057/// Resolve the display name for a column reference.
1058fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1059    match column {
1060        ColumnRef::Name(n) => n.to_string(),
1061        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1062    }
1063}
1064
1065/// Find the select expression matching a column reference.
1066fn find_select_expr(
1067    scope_expr: &Expression,
1068    column: &ColumnRef<'_>,
1069    dialect: Option<DialectType>,
1070) -> Result<Expression> {
1071    if let Expression::Select(ref select) = scope_expr {
1072        match column {
1073            ColumnRef::Name(name) => {
1074                let normalized_name = normalize_column_name(name, dialect);
1075                for expr in &select.expressions {
1076                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1077                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1078                            return Ok(expr.clone());
1079                        }
1080                    }
1081                }
1082                Err(crate::error::Error::parse(
1083                    format!("Cannot find column '{}' in query", name),
1084                    0,
1085                    0,
1086                    0,
1087                    0,
1088                ))
1089            }
1090            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1091                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1092            }),
1093        }
1094    } else {
1095        Err(crate::error::Error::parse(
1096            "Expected SELECT expression for column lookup",
1097            0,
1098            0,
1099            0,
1100            0,
1101        ))
1102    }
1103}
1104
1105/// Find the positional index of a column name in a set operation's first SELECT branch.
1106fn column_to_index(
1107    set_op_expr: &Expression,
1108    name: &str,
1109    dialect: Option<DialectType>,
1110) -> Result<usize> {
1111    let normalized_name = normalize_column_name(name, dialect);
1112    let mut expr = set_op_expr;
1113    loop {
1114        match expr {
1115            Expression::Union(u) => expr = &u.left,
1116            Expression::Intersect(i) => expr = &i.left,
1117            Expression::Except(e) => expr = &e.left,
1118            Expression::Select(select) => {
1119                for (i, e) in select.expressions.iter().enumerate() {
1120                    if let Some(alias_or_name) = get_alias_or_name(e) {
1121                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1122                            return Ok(i);
1123                        }
1124                    }
1125                }
1126                return Err(crate::error::Error::parse(
1127                    format!("Cannot find column '{}' in set operation", name),
1128                    0,
1129                    0,
1130                    0,
1131                    0,
1132                ));
1133            }
1134            _ => {
1135                return Err(crate::error::Error::parse(
1136                    "Expected SELECT or set operation",
1137                    0,
1138                    0,
1139                    0,
1140                    0,
1141                ))
1142            }
1143        }
1144    }
1145}
1146
1147fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1148    normalize_name(name, dialect, false, true)
1149}
1150
1151/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1152fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1153    if let Expression::Select(select) = select_expr {
1154        let mut trimmed = select.as_ref().clone();
1155        trimmed.expressions = vec![target_expr.clone()];
1156        Expression::Select(Box::new(trimmed))
1157    } else {
1158        select_expr.clone()
1159    }
1160}
1161
1162/// Find the child scope (CTE or derived table) for a given source name.
1163fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1164    // Check CTE scopes
1165    if scope.cte_sources.contains_key(source_name) {
1166        for cte_scope in &scope.cte_scopes {
1167            if let Expression::Cte(cte) = &cte_scope.expression {
1168                if cte.alias.name == source_name {
1169                    return Some(cte_scope);
1170                }
1171            }
1172        }
1173    }
1174
1175    // Check derived table scopes
1176    if let Some(source_info) = scope.sources.get(source_name) {
1177        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1178            if let Expression::Subquery(sq) = &source_info.expression {
1179                for dt_scope in &scope.derived_table_scopes {
1180                    if dt_scope.expression == sq.this {
1181                        return Some(dt_scope);
1182                    }
1183                }
1184            }
1185        }
1186    }
1187
1188    None
1189}
1190
1191/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1192/// This handles nested CTEs where the current scope doesn't have the CTE scope
1193/// as a direct child but knows about it via cte_sources.
1194fn find_child_scope_in<'a>(
1195    all_cte_scopes: &[&'a Scope],
1196    scope: &'a Scope,
1197    source_name: &str,
1198) -> Option<&'a Scope> {
1199    // First try the scope's own cte_scopes
1200    for cte_scope in &scope.cte_scopes {
1201        if let Expression::Cte(cte) = &cte_scope.expression {
1202            if cte.alias.name == source_name {
1203                return Some(cte_scope);
1204            }
1205        }
1206    }
1207
1208    // Then search through all ancestor CTE scopes
1209    for cte_scope in all_cte_scopes {
1210        if let Expression::Cte(cte) = &cte_scope.expression {
1211            if cte.alias.name == source_name {
1212                return Some(cte_scope);
1213            }
1214        }
1215    }
1216
1217    // Fall back to derived table scopes
1218    if let Some(source_info) = scope.sources.get(source_name) {
1219        if source_info.is_scope {
1220            if let Expression::Subquery(sq) = &source_info.expression {
1221                for dt_scope in &scope.derived_table_scopes {
1222                    if dt_scope.expression == sq.this {
1223                        return Some(dt_scope);
1224                    }
1225                }
1226            }
1227        }
1228    }
1229
1230    None
1231}
1232
1233/// Create a terminal lineage node for a table.column reference.
1234fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1235    let mut node = LineageNode::new(
1236        format!("{}.{}", table, column),
1237        Expression::Column(Box::new(crate::expressions::Column {
1238            name: crate::expressions::Identifier::new(column.to_string()),
1239            table: Some(crate::expressions::Identifier::new(table.to_string())),
1240            join_mark: false,
1241            trailing_comments: vec![],
1242            span: None,
1243            inferred_type: None,
1244        })),
1245        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1246    );
1247    node.source_name = table.to_string();
1248    node
1249}
1250
1251fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1252    let mut parts: Vec<String> = Vec::new();
1253    if let Some(catalog) = &table_ref.catalog {
1254        parts.push(catalog.name.clone());
1255    }
1256    if let Some(schema) = &table_ref.schema {
1257        parts.push(schema.name.clone());
1258    }
1259    parts.push(table_ref.name.name.clone());
1260    parts.join(".")
1261}
1262
1263fn make_table_column_node_from_source(
1264    table_alias: &str,
1265    column: &str,
1266    source: &Expression,
1267) -> LineageNode {
1268    let mut node = LineageNode::new(
1269        format!("{}.{}", table_alias, column),
1270        Expression::Column(Box::new(crate::expressions::Column {
1271            name: crate::expressions::Identifier::new(column.to_string()),
1272            table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1273            join_mark: false,
1274            trailing_comments: vec![],
1275            span: None,
1276            inferred_type: None,
1277        })),
1278        source.clone(),
1279    );
1280
1281    if let Expression::Table(table_ref) = source {
1282        node.source_name = table_name_from_table_ref(table_ref);
1283    } else {
1284        node.source_name = table_alias.to_string();
1285    }
1286
1287    node
1288}
1289
1290/// Simple column reference extracted from an expression
1291#[derive(Debug, Clone)]
1292struct SimpleColumnRef {
1293    table: Option<crate::expressions::Identifier>,
1294    column: String,
1295}
1296
1297/// Find all column references in an expression (does not recurse into subqueries).
1298fn find_column_refs_in_expr(
1299    expr: &Expression,
1300    dialect: Option<DialectType>,
1301) -> Vec<SimpleColumnRef> {
1302    let mut refs = Vec::new();
1303    collect_column_refs(expr, dialect, &mut refs);
1304    refs
1305}
1306
1307fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1308    match expr {
1309        Expression::Column(col) => {
1310            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1311        }
1312        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1313        _ => false,
1314    }
1315}
1316
1317fn collect_column_refs(
1318    expr: &Expression,
1319    dialect: Option<DialectType>,
1320    refs: &mut Vec<SimpleColumnRef>,
1321) {
1322    let mut stack: Vec<&Expression> = vec![expr];
1323
1324    while let Some(current) = stack.pop() {
1325        match current {
1326            // === Leaf: collect Column references ===
1327            Expression::Column(col) => {
1328                refs.push(SimpleColumnRef {
1329                    table: col.table.clone(),
1330                    column: col.name.name.clone(),
1331                });
1332            }
1333
1334            // === Boundary: don't recurse into subqueries (handled separately) ===
1335            Expression::Subquery(_) | Expression::Exists(_) => {}
1336
1337            // === BinaryOp variants: left, right ===
1338            Expression::And(op)
1339            | Expression::Or(op)
1340            | Expression::Eq(op)
1341            | Expression::Neq(op)
1342            | Expression::Lt(op)
1343            | Expression::Lte(op)
1344            | Expression::Gt(op)
1345            | Expression::Gte(op)
1346            | Expression::Add(op)
1347            | Expression::Sub(op)
1348            | Expression::Mul(op)
1349            | Expression::Div(op)
1350            | Expression::Mod(op)
1351            | Expression::BitwiseAnd(op)
1352            | Expression::BitwiseOr(op)
1353            | Expression::BitwiseXor(op)
1354            | Expression::BitwiseLeftShift(op)
1355            | Expression::BitwiseRightShift(op)
1356            | Expression::Concat(op)
1357            | Expression::Adjacent(op)
1358            | Expression::TsMatch(op)
1359            | Expression::PropertyEQ(op)
1360            | Expression::ArrayContainsAll(op)
1361            | Expression::ArrayContainedBy(op)
1362            | Expression::ArrayOverlaps(op)
1363            | Expression::JSONBContainsAllTopKeys(op)
1364            | Expression::JSONBContainsAnyTopKeys(op)
1365            | Expression::JSONBDeleteAtPath(op)
1366            | Expression::ExtendsLeft(op)
1367            | Expression::ExtendsRight(op)
1368            | Expression::Is(op)
1369            | Expression::MemberOf(op)
1370            | Expression::NullSafeEq(op)
1371            | Expression::NullSafeNeq(op)
1372            | Expression::Glob(op)
1373            | Expression::Match(op) => {
1374                stack.push(&op.left);
1375                stack.push(&op.right);
1376            }
1377
1378            // === UnaryOp variants: this ===
1379            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1380                stack.push(&u.this);
1381            }
1382
1383            // === UnaryFunc variants: this ===
1384            Expression::Upper(f)
1385            | Expression::Lower(f)
1386            | Expression::Length(f)
1387            | Expression::LTrim(f)
1388            | Expression::RTrim(f)
1389            | Expression::Reverse(f)
1390            | Expression::Abs(f)
1391            | Expression::Sqrt(f)
1392            | Expression::Cbrt(f)
1393            | Expression::Ln(f)
1394            | Expression::Exp(f)
1395            | Expression::Sign(f)
1396            | Expression::Date(f)
1397            | Expression::Time(f)
1398            | Expression::DateFromUnixDate(f)
1399            | Expression::UnixDate(f)
1400            | Expression::UnixSeconds(f)
1401            | Expression::UnixMillis(f)
1402            | Expression::UnixMicros(f)
1403            | Expression::TimeStrToDate(f)
1404            | Expression::DateToDi(f)
1405            | Expression::DiToDate(f)
1406            | Expression::TsOrDiToDi(f)
1407            | Expression::TsOrDsToDatetime(f)
1408            | Expression::TsOrDsToTimestamp(f)
1409            | Expression::YearOfWeek(f)
1410            | Expression::YearOfWeekIso(f)
1411            | Expression::Initcap(f)
1412            | Expression::Ascii(f)
1413            | Expression::Chr(f)
1414            | Expression::Soundex(f)
1415            | Expression::ByteLength(f)
1416            | Expression::Hex(f)
1417            | Expression::LowerHex(f)
1418            | Expression::Unicode(f)
1419            | Expression::Radians(f)
1420            | Expression::Degrees(f)
1421            | Expression::Sin(f)
1422            | Expression::Cos(f)
1423            | Expression::Tan(f)
1424            | Expression::Asin(f)
1425            | Expression::Acos(f)
1426            | Expression::Atan(f)
1427            | Expression::IsNan(f)
1428            | Expression::IsInf(f)
1429            | Expression::ArrayLength(f)
1430            | Expression::ArraySize(f)
1431            | Expression::Cardinality(f)
1432            | Expression::ArrayReverse(f)
1433            | Expression::ArrayDistinct(f)
1434            | Expression::ArrayFlatten(f)
1435            | Expression::ArrayCompact(f)
1436            | Expression::Explode(f)
1437            | Expression::ExplodeOuter(f)
1438            | Expression::ToArray(f)
1439            | Expression::MapFromEntries(f)
1440            | Expression::MapKeys(f)
1441            | Expression::MapValues(f)
1442            | Expression::JsonArrayLength(f)
1443            | Expression::JsonKeys(f)
1444            | Expression::JsonType(f)
1445            | Expression::ParseJson(f)
1446            | Expression::ToJson(f)
1447            | Expression::Typeof(f)
1448            | Expression::BitwiseCount(f)
1449            | Expression::Year(f)
1450            | Expression::Month(f)
1451            | Expression::Day(f)
1452            | Expression::Hour(f)
1453            | Expression::Minute(f)
1454            | Expression::Second(f)
1455            | Expression::DayOfWeek(f)
1456            | Expression::DayOfWeekIso(f)
1457            | Expression::DayOfMonth(f)
1458            | Expression::DayOfYear(f)
1459            | Expression::WeekOfYear(f)
1460            | Expression::Quarter(f)
1461            | Expression::Epoch(f)
1462            | Expression::EpochMs(f)
1463            | Expression::TimeStrToUnix(f)
1464            | Expression::SHA(f)
1465            | Expression::SHA1Digest(f)
1466            | Expression::TimeToUnix(f)
1467            | Expression::JSONBool(f)
1468            | Expression::Int64(f)
1469            | Expression::MD5NumberLower64(f)
1470            | Expression::MD5NumberUpper64(f)
1471            | Expression::DateStrToDate(f)
1472            | Expression::DateToDateStr(f) => {
1473                stack.push(&f.this);
1474            }
1475
1476            // === BinaryFunc variants: this, expression ===
1477            Expression::Power(f)
1478            | Expression::NullIf(f)
1479            | Expression::IfNull(f)
1480            | Expression::Nvl(f)
1481            | Expression::UnixToTimeStr(f)
1482            | Expression::Contains(f)
1483            | Expression::StartsWith(f)
1484            | Expression::EndsWith(f)
1485            | Expression::Levenshtein(f)
1486            | Expression::ModFunc(f)
1487            | Expression::Atan2(f)
1488            | Expression::IntDiv(f)
1489            | Expression::AddMonths(f)
1490            | Expression::MonthsBetween(f)
1491            | Expression::NextDay(f)
1492            | Expression::ArrayContains(f)
1493            | Expression::ArrayPosition(f)
1494            | Expression::ArrayAppend(f)
1495            | Expression::ArrayPrepend(f)
1496            | Expression::ArrayUnion(f)
1497            | Expression::ArrayExcept(f)
1498            | Expression::ArrayRemove(f)
1499            | Expression::StarMap(f)
1500            | Expression::MapFromArrays(f)
1501            | Expression::MapContainsKey(f)
1502            | Expression::ElementAt(f)
1503            | Expression::JsonMergePatch(f)
1504            | Expression::JSONBContains(f)
1505            | Expression::JSONBExtract(f) => {
1506                stack.push(&f.this);
1507                stack.push(&f.expression);
1508            }
1509
1510            // === VarArgFunc variants: expressions ===
1511            Expression::Greatest(f)
1512            | Expression::Least(f)
1513            | Expression::Coalesce(f)
1514            | Expression::ArrayConcat(f)
1515            | Expression::ArrayIntersect(f)
1516            | Expression::ArrayZip(f)
1517            | Expression::MapConcat(f)
1518            | Expression::JsonArray(f) => {
1519                for e in &f.expressions {
1520                    stack.push(e);
1521                }
1522            }
1523
1524            // === AggFunc variants: this, filter, having_max, limit ===
1525            Expression::Sum(f)
1526            | Expression::Avg(f)
1527            | Expression::Min(f)
1528            | Expression::Max(f)
1529            | Expression::ArrayAgg(f)
1530            | Expression::CountIf(f)
1531            | Expression::Stddev(f)
1532            | Expression::StddevPop(f)
1533            | Expression::StddevSamp(f)
1534            | Expression::Variance(f)
1535            | Expression::VarPop(f)
1536            | Expression::VarSamp(f)
1537            | Expression::Median(f)
1538            | Expression::Mode(f)
1539            | Expression::First(f)
1540            | Expression::Last(f)
1541            | Expression::AnyValue(f)
1542            | Expression::ApproxDistinct(f)
1543            | Expression::ApproxCountDistinct(f)
1544            | Expression::LogicalAnd(f)
1545            | Expression::LogicalOr(f)
1546            | Expression::Skewness(f)
1547            | Expression::ArrayConcatAgg(f)
1548            | Expression::ArrayUniqueAgg(f)
1549            | Expression::BoolXorAgg(f)
1550            | Expression::BitwiseAndAgg(f)
1551            | Expression::BitwiseOrAgg(f)
1552            | Expression::BitwiseXorAgg(f) => {
1553                stack.push(&f.this);
1554                if let Some(ref filter) = f.filter {
1555                    stack.push(filter);
1556                }
1557                if let Some((ref expr, _)) = f.having_max {
1558                    stack.push(expr);
1559                }
1560                if let Some(ref limit) = f.limit {
1561                    stack.push(limit);
1562                }
1563            }
1564
1565            // === Generic Function / AggregateFunction: args ===
1566            Expression::Function(func) => {
1567                for arg in &func.args {
1568                    stack.push(arg);
1569                }
1570            }
1571            Expression::AggregateFunction(func) => {
1572                for arg in &func.args {
1573                    stack.push(arg);
1574                }
1575                if let Some(ref filter) = func.filter {
1576                    stack.push(filter);
1577                }
1578                if let Some(ref limit) = func.limit {
1579                    stack.push(limit);
1580                }
1581            }
1582
1583            // === WindowFunction: this (skip Over for lineage purposes) ===
1584            Expression::WindowFunction(wf) => {
1585                stack.push(&wf.this);
1586            }
1587
1588            // === Containers and special expressions ===
1589            Expression::Alias(a) => {
1590                stack.push(&a.this);
1591            }
1592            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1593                stack.push(&c.this);
1594                if let Some(ref fmt) = c.format {
1595                    stack.push(fmt);
1596                }
1597                if let Some(ref def) = c.default {
1598                    stack.push(def);
1599                }
1600            }
1601            Expression::Paren(p) => {
1602                stack.push(&p.this);
1603            }
1604            Expression::Annotated(a) => {
1605                stack.push(&a.this);
1606            }
1607            Expression::Case(case) => {
1608                if let Some(ref operand) = case.operand {
1609                    stack.push(operand);
1610                }
1611                for (cond, result) in &case.whens {
1612                    stack.push(cond);
1613                    stack.push(result);
1614                }
1615                if let Some(ref else_expr) = case.else_ {
1616                    stack.push(else_expr);
1617                }
1618            }
1619            Expression::Collation(c) => {
1620                stack.push(&c.this);
1621            }
1622            Expression::In(i) => {
1623                stack.push(&i.this);
1624                for e in &i.expressions {
1625                    stack.push(e);
1626                }
1627                if let Some(ref q) = i.query {
1628                    stack.push(q);
1629                }
1630                if let Some(ref u) = i.unnest {
1631                    stack.push(u);
1632                }
1633            }
1634            Expression::Between(b) => {
1635                stack.push(&b.this);
1636                stack.push(&b.low);
1637                stack.push(&b.high);
1638            }
1639            Expression::IsNull(n) => {
1640                stack.push(&n.this);
1641            }
1642            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1643                stack.push(&t.this);
1644            }
1645            Expression::IsJson(j) => {
1646                stack.push(&j.this);
1647            }
1648            Expression::Like(l) | Expression::ILike(l) => {
1649                stack.push(&l.left);
1650                stack.push(&l.right);
1651                if let Some(ref esc) = l.escape {
1652                    stack.push(esc);
1653                }
1654            }
1655            Expression::SimilarTo(s) => {
1656                stack.push(&s.this);
1657                stack.push(&s.pattern);
1658                if let Some(ref esc) = s.escape {
1659                    stack.push(esc);
1660                }
1661            }
1662            Expression::Ordered(o) => {
1663                stack.push(&o.this);
1664            }
1665            Expression::Array(a) => {
1666                for e in &a.expressions {
1667                    stack.push(e);
1668                }
1669            }
1670            Expression::Tuple(t) => {
1671                for e in &t.expressions {
1672                    stack.push(e);
1673                }
1674            }
1675            Expression::Struct(s) => {
1676                for (_, e) in &s.fields {
1677                    stack.push(e);
1678                }
1679            }
1680            Expression::Subscript(s) => {
1681                stack.push(&s.this);
1682                stack.push(&s.index);
1683            }
1684            Expression::Dot(d) => {
1685                stack.push(&d.this);
1686            }
1687            Expression::MethodCall(m) => {
1688                if !matches!(dialect, Some(DialectType::BigQuery))
1689                    || !is_bigquery_safe_namespace_receiver(&m.this)
1690                {
1691                    stack.push(&m.this);
1692                }
1693                for arg in &m.args {
1694                    stack.push(arg);
1695                }
1696            }
1697            Expression::ArraySlice(s) => {
1698                stack.push(&s.this);
1699                if let Some(ref start) = s.start {
1700                    stack.push(start);
1701                }
1702                if let Some(ref end) = s.end {
1703                    stack.push(end);
1704                }
1705            }
1706            Expression::Lambda(l) => {
1707                stack.push(&l.body);
1708            }
1709            Expression::NamedArgument(n) => {
1710                stack.push(&n.value);
1711            }
1712            Expression::TryCatch(t) => {
1713                for stmt in &t.try_body {
1714                    stack.push(stmt);
1715                }
1716                if let Some(catch_body) = &t.catch_body {
1717                    for stmt in catch_body {
1718                        stack.push(stmt);
1719                    }
1720                }
1721            }
1722            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1723                stack.push(e);
1724            }
1725
1726            // === Custom function structs ===
1727            Expression::Substring(f) => {
1728                stack.push(&f.this);
1729                stack.push(&f.start);
1730                if let Some(ref len) = f.length {
1731                    stack.push(len);
1732                }
1733            }
1734            Expression::Trim(f) => {
1735                stack.push(&f.this);
1736                if let Some(ref chars) = f.characters {
1737                    stack.push(chars);
1738                }
1739            }
1740            Expression::Replace(f) => {
1741                stack.push(&f.this);
1742                stack.push(&f.old);
1743                stack.push(&f.new);
1744            }
1745            Expression::IfFunc(f) => {
1746                stack.push(&f.condition);
1747                stack.push(&f.true_value);
1748                if let Some(ref fv) = f.false_value {
1749                    stack.push(fv);
1750                }
1751            }
1752            Expression::Nvl2(f) => {
1753                stack.push(&f.this);
1754                stack.push(&f.true_value);
1755                stack.push(&f.false_value);
1756            }
1757            Expression::ConcatWs(f) => {
1758                stack.push(&f.separator);
1759                for e in &f.expressions {
1760                    stack.push(e);
1761                }
1762            }
1763            Expression::Count(f) => {
1764                if let Some(ref this) = f.this {
1765                    stack.push(this);
1766                }
1767                if let Some(ref filter) = f.filter {
1768                    stack.push(filter);
1769                }
1770            }
1771            Expression::GroupConcat(f) => {
1772                stack.push(&f.this);
1773                if let Some(ref sep) = f.separator {
1774                    stack.push(sep);
1775                }
1776                if let Some(ref filter) = f.filter {
1777                    stack.push(filter);
1778                }
1779            }
1780            Expression::StringAgg(f) => {
1781                stack.push(&f.this);
1782                if let Some(ref sep) = f.separator {
1783                    stack.push(sep);
1784                }
1785                if let Some(ref filter) = f.filter {
1786                    stack.push(filter);
1787                }
1788                if let Some(ref limit) = f.limit {
1789                    stack.push(limit);
1790                }
1791            }
1792            Expression::ListAgg(f) => {
1793                stack.push(&f.this);
1794                if let Some(ref sep) = f.separator {
1795                    stack.push(sep);
1796                }
1797                if let Some(ref filter) = f.filter {
1798                    stack.push(filter);
1799                }
1800            }
1801            Expression::SumIf(f) => {
1802                stack.push(&f.this);
1803                stack.push(&f.condition);
1804                if let Some(ref filter) = f.filter {
1805                    stack.push(filter);
1806                }
1807            }
1808            Expression::DateAdd(f) | Expression::DateSub(f) => {
1809                stack.push(&f.this);
1810                stack.push(&f.interval);
1811            }
1812            Expression::DateDiff(f) => {
1813                stack.push(&f.this);
1814                stack.push(&f.expression);
1815            }
1816            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1817                stack.push(&f.this);
1818            }
1819            Expression::Extract(f) => {
1820                stack.push(&f.this);
1821            }
1822            Expression::Round(f) => {
1823                stack.push(&f.this);
1824                if let Some(ref d) = f.decimals {
1825                    stack.push(d);
1826                }
1827            }
1828            Expression::Floor(f) => {
1829                stack.push(&f.this);
1830                if let Some(ref s) = f.scale {
1831                    stack.push(s);
1832                }
1833                if let Some(ref t) = f.to {
1834                    stack.push(t);
1835                }
1836            }
1837            Expression::Ceil(f) => {
1838                stack.push(&f.this);
1839                if let Some(ref d) = f.decimals {
1840                    stack.push(d);
1841                }
1842                if let Some(ref t) = f.to {
1843                    stack.push(t);
1844                }
1845            }
1846            Expression::Log(f) => {
1847                stack.push(&f.this);
1848                if let Some(ref b) = f.base {
1849                    stack.push(b);
1850                }
1851            }
1852            Expression::AtTimeZone(f) => {
1853                stack.push(&f.this);
1854                stack.push(&f.zone);
1855            }
1856            Expression::Lead(f) | Expression::Lag(f) => {
1857                stack.push(&f.this);
1858                if let Some(ref off) = f.offset {
1859                    stack.push(off);
1860                }
1861                if let Some(ref def) = f.default {
1862                    stack.push(def);
1863                }
1864            }
1865            Expression::FirstValue(f) | Expression::LastValue(f) => {
1866                stack.push(&f.this);
1867            }
1868            Expression::NthValue(f) => {
1869                stack.push(&f.this);
1870                stack.push(&f.offset);
1871            }
1872            Expression::Position(f) => {
1873                stack.push(&f.substring);
1874                stack.push(&f.string);
1875                if let Some(ref start) = f.start {
1876                    stack.push(start);
1877                }
1878            }
1879            Expression::Decode(f) => {
1880                stack.push(&f.this);
1881                for (search, result) in &f.search_results {
1882                    stack.push(search);
1883                    stack.push(result);
1884                }
1885                if let Some(ref def) = f.default {
1886                    stack.push(def);
1887                }
1888            }
1889            Expression::CharFunc(f) => {
1890                for arg in &f.args {
1891                    stack.push(arg);
1892                }
1893            }
1894            Expression::ArraySort(f) => {
1895                stack.push(&f.this);
1896                if let Some(ref cmp) = f.comparator {
1897                    stack.push(cmp);
1898                }
1899            }
1900            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1901                stack.push(&f.this);
1902                stack.push(&f.separator);
1903                if let Some(ref nr) = f.null_replacement {
1904                    stack.push(nr);
1905                }
1906            }
1907            Expression::ArrayFilter(f) => {
1908                stack.push(&f.this);
1909                stack.push(&f.filter);
1910            }
1911            Expression::ArrayTransform(f) => {
1912                stack.push(&f.this);
1913                stack.push(&f.transform);
1914            }
1915            Expression::Sequence(f)
1916            | Expression::Generate(f)
1917            | Expression::ExplodingGenerateSeries(f) => {
1918                stack.push(&f.start);
1919                stack.push(&f.stop);
1920                if let Some(ref step) = f.step {
1921                    stack.push(step);
1922                }
1923            }
1924            Expression::JsonExtract(f)
1925            | Expression::JsonExtractScalar(f)
1926            | Expression::JsonQuery(f)
1927            | Expression::JsonValue(f) => {
1928                stack.push(&f.this);
1929                stack.push(&f.path);
1930            }
1931            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1932                stack.push(&f.this);
1933                for p in &f.paths {
1934                    stack.push(p);
1935                }
1936            }
1937            Expression::JsonObject(f) => {
1938                for (k, v) in &f.pairs {
1939                    stack.push(k);
1940                    stack.push(v);
1941                }
1942            }
1943            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1944                stack.push(&f.this);
1945                for (path, val) in &f.path_values {
1946                    stack.push(path);
1947                    stack.push(val);
1948                }
1949            }
1950            Expression::Overlay(f) => {
1951                stack.push(&f.this);
1952                stack.push(&f.replacement);
1953                stack.push(&f.from);
1954                if let Some(ref len) = f.length {
1955                    stack.push(len);
1956                }
1957            }
1958            Expression::Convert(f) => {
1959                stack.push(&f.this);
1960                if let Some(ref style) = f.style {
1961                    stack.push(style);
1962                }
1963            }
1964            Expression::ApproxPercentile(f) => {
1965                stack.push(&f.this);
1966                stack.push(&f.percentile);
1967                if let Some(ref acc) = f.accuracy {
1968                    stack.push(acc);
1969                }
1970                if let Some(ref filter) = f.filter {
1971                    stack.push(filter);
1972                }
1973            }
1974            Expression::Percentile(f)
1975            | Expression::PercentileCont(f)
1976            | Expression::PercentileDisc(f) => {
1977                stack.push(&f.this);
1978                stack.push(&f.percentile);
1979                if let Some(ref filter) = f.filter {
1980                    stack.push(filter);
1981                }
1982            }
1983            Expression::WithinGroup(f) => {
1984                stack.push(&f.this);
1985            }
1986            Expression::Left(f) | Expression::Right(f) => {
1987                stack.push(&f.this);
1988                stack.push(&f.length);
1989            }
1990            Expression::Repeat(f) => {
1991                stack.push(&f.this);
1992                stack.push(&f.times);
1993            }
1994            Expression::Lpad(f) | Expression::Rpad(f) => {
1995                stack.push(&f.this);
1996                stack.push(&f.length);
1997                if let Some(ref fill) = f.fill {
1998                    stack.push(fill);
1999                }
2000            }
2001            Expression::Split(f) => {
2002                stack.push(&f.this);
2003                stack.push(&f.delimiter);
2004            }
2005            Expression::RegexpLike(f) => {
2006                stack.push(&f.this);
2007                stack.push(&f.pattern);
2008                if let Some(ref flags) = f.flags {
2009                    stack.push(flags);
2010                }
2011            }
2012            Expression::RegexpReplace(f) => {
2013                stack.push(&f.this);
2014                stack.push(&f.pattern);
2015                stack.push(&f.replacement);
2016                if let Some(ref flags) = f.flags {
2017                    stack.push(flags);
2018                }
2019            }
2020            Expression::RegexpExtract(f) => {
2021                stack.push(&f.this);
2022                stack.push(&f.pattern);
2023                if let Some(ref group) = f.group {
2024                    stack.push(group);
2025                }
2026            }
2027            Expression::ToDate(f) => {
2028                stack.push(&f.this);
2029                if let Some(ref fmt) = f.format {
2030                    stack.push(fmt);
2031                }
2032            }
2033            Expression::ToTimestamp(f) => {
2034                stack.push(&f.this);
2035                if let Some(ref fmt) = f.format {
2036                    stack.push(fmt);
2037                }
2038            }
2039            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2040                stack.push(&f.this);
2041                stack.push(&f.format);
2042            }
2043            Expression::LastDay(f) => {
2044                stack.push(&f.this);
2045            }
2046            Expression::FromUnixtime(f) => {
2047                stack.push(&f.this);
2048                if let Some(ref fmt) = f.format {
2049                    stack.push(fmt);
2050                }
2051            }
2052            Expression::UnixTimestamp(f) => {
2053                if let Some(ref this) = f.this {
2054                    stack.push(this);
2055                }
2056                if let Some(ref fmt) = f.format {
2057                    stack.push(fmt);
2058                }
2059            }
2060            Expression::MakeDate(f) => {
2061                stack.push(&f.year);
2062                stack.push(&f.month);
2063                stack.push(&f.day);
2064            }
2065            Expression::MakeTimestamp(f) => {
2066                stack.push(&f.year);
2067                stack.push(&f.month);
2068                stack.push(&f.day);
2069                stack.push(&f.hour);
2070                stack.push(&f.minute);
2071                stack.push(&f.second);
2072                if let Some(ref tz) = f.timezone {
2073                    stack.push(tz);
2074                }
2075            }
2076            Expression::TruncFunc(f) => {
2077                stack.push(&f.this);
2078                if let Some(ref d) = f.decimals {
2079                    stack.push(d);
2080                }
2081            }
2082            Expression::ArrayFunc(f) => {
2083                for e in &f.expressions {
2084                    stack.push(e);
2085                }
2086            }
2087            Expression::Unnest(f) => {
2088                stack.push(&f.this);
2089                for e in &f.expressions {
2090                    stack.push(e);
2091                }
2092            }
2093            Expression::StructFunc(f) => {
2094                for (_, e) in &f.fields {
2095                    stack.push(e);
2096                }
2097            }
2098            Expression::StructExtract(f) => {
2099                stack.push(&f.this);
2100            }
2101            Expression::NamedStruct(f) => {
2102                for (k, v) in &f.pairs {
2103                    stack.push(k);
2104                    stack.push(v);
2105                }
2106            }
2107            Expression::MapFunc(f) => {
2108                for k in &f.keys {
2109                    stack.push(k);
2110                }
2111                for v in &f.values {
2112                    stack.push(v);
2113                }
2114            }
2115            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2116                stack.push(&f.this);
2117                stack.push(&f.transform);
2118            }
2119            Expression::JsonArrayAgg(f) => {
2120                stack.push(&f.this);
2121                if let Some(ref filter) = f.filter {
2122                    stack.push(filter);
2123                }
2124            }
2125            Expression::JsonObjectAgg(f) => {
2126                stack.push(&f.key);
2127                stack.push(&f.value);
2128                if let Some(ref filter) = f.filter {
2129                    stack.push(filter);
2130                }
2131            }
2132            Expression::NTile(f) => {
2133                if let Some(ref n) = f.num_buckets {
2134                    stack.push(n);
2135                }
2136            }
2137            Expression::Rand(f) => {
2138                if let Some(ref s) = f.seed {
2139                    stack.push(s);
2140                }
2141                if let Some(ref lo) = f.lower {
2142                    stack.push(lo);
2143                }
2144                if let Some(ref hi) = f.upper {
2145                    stack.push(hi);
2146                }
2147            }
2148            Expression::Any(q) | Expression::All(q) => {
2149                stack.push(&q.this);
2150                stack.push(&q.subquery);
2151            }
2152            Expression::Overlaps(o) => {
2153                if let Some(ref this) = o.this {
2154                    stack.push(this);
2155                }
2156                if let Some(ref expr) = o.expression {
2157                    stack.push(expr);
2158                }
2159                if let Some(ref ls) = o.left_start {
2160                    stack.push(ls);
2161                }
2162                if let Some(ref le) = o.left_end {
2163                    stack.push(le);
2164                }
2165                if let Some(ref rs) = o.right_start {
2166                    stack.push(rs);
2167                }
2168                if let Some(ref re) = o.right_end {
2169                    stack.push(re);
2170                }
2171            }
2172            Expression::Interval(i) => {
2173                if let Some(ref this) = i.this {
2174                    stack.push(this);
2175                }
2176            }
2177            Expression::TimeStrToTime(f) => {
2178                stack.push(&f.this);
2179                if let Some(ref zone) = f.zone {
2180                    stack.push(zone);
2181                }
2182            }
2183            Expression::JSONBExtractScalar(f) => {
2184                stack.push(&f.this);
2185                stack.push(&f.expression);
2186                if let Some(ref jt) = f.json_type {
2187                    stack.push(jt);
2188                }
2189            }
2190
2191            // === True leaves and non-expression-bearing nodes ===
2192            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2193            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2194            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2195            _ => {}
2196        }
2197    }
2198}
2199
2200// ---------------------------------------------------------------------------
2201// Tests
2202// ---------------------------------------------------------------------------
2203
2204#[cfg(test)]
2205mod tests {
2206    use super::*;
2207    use crate::dialects::{Dialect, DialectType};
2208    use crate::expressions::DataType;
2209    use crate::optimizer::annotate_types::annotate_types;
2210    use crate::parse_one;
2211    use crate::schema::{MappingSchema, Schema};
2212
2213    fn parse(sql: &str) -> Expression {
2214        let dialect = Dialect::get(DialectType::Generic);
2215        let ast = dialect.parse(sql).unwrap();
2216        ast.into_iter().next().unwrap()
2217    }
2218
2219    #[test]
2220    fn test_simple_lineage() {
2221        let expr = parse("SELECT a FROM t");
2222        let node = lineage("a", &expr, None, false).unwrap();
2223
2224        assert_eq!(node.name, "a");
2225        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2226        // Should trace to t.a
2227        let names = node.downstream_names();
2228        assert!(
2229            names.iter().any(|n| n == "t.a"),
2230            "Expected t.a in downstream, got: {:?}",
2231            names
2232        );
2233    }
2234
2235    #[test]
2236    fn test_lineage_walk() {
2237        let root = LineageNode {
2238            name: "col_a".to_string(),
2239            expression: Expression::Null(crate::expressions::Null),
2240            source: Expression::Null(crate::expressions::Null),
2241            downstream: vec![LineageNode::new(
2242                "t.a",
2243                Expression::Null(crate::expressions::Null),
2244                Expression::Null(crate::expressions::Null),
2245            )],
2246            source_name: String::new(),
2247            reference_node_name: String::new(),
2248        };
2249
2250        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2251        assert_eq!(names.len(), 2);
2252        assert_eq!(names[0], "col_a");
2253        assert_eq!(names[1], "t.a");
2254    }
2255
2256    #[test]
2257    fn test_aliased_column() {
2258        let expr = parse("SELECT a + 1 AS b FROM t");
2259        let node = lineage("b", &expr, None, false).unwrap();
2260
2261        assert_eq!(node.name, "b");
2262        // Should trace through the expression to t.a
2263        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2264        assert!(
2265            all_names.iter().any(|n| n.contains("a")),
2266            "Expected to trace to column a, got: {:?}",
2267            all_names
2268        );
2269    }
2270
2271    #[test]
2272    fn test_qualified_column() {
2273        let expr = parse("SELECT t.a FROM t");
2274        let node = lineage("a", &expr, None, false).unwrap();
2275
2276        assert_eq!(node.name, "a");
2277        let names = node.downstream_names();
2278        assert!(
2279            names.iter().any(|n| n == "t.a"),
2280            "Expected t.a, got: {:?}",
2281            names
2282        );
2283    }
2284
2285    #[test]
2286    fn test_unqualified_column() {
2287        let expr = parse("SELECT a FROM t");
2288        let node = lineage("a", &expr, None, false).unwrap();
2289
2290        // Unqualified but single source → resolved to t.a
2291        let names = node.downstream_names();
2292        assert!(
2293            names.iter().any(|n| n == "t.a"),
2294            "Expected t.a, got: {:?}",
2295            names
2296        );
2297    }
2298
2299    #[test]
2300    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2301        let query = "SELECT name FROM users";
2302        let dialect = Dialect::get(DialectType::BigQuery);
2303        let expr = dialect
2304            .parse(query)
2305            .unwrap()
2306            .into_iter()
2307            .next()
2308            .expect("expected one expression");
2309
2310        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2311        schema
2312            .add_table("users", &[("name".into(), DataType::Text)], None)
2313            .expect("schema setup");
2314
2315        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2316            .expect("lineage without schema");
2317        let mut expr_without = node_without_schema.expression.clone();
2318        annotate_types(
2319            &mut expr_without,
2320            Some(&schema),
2321            Some(DialectType::BigQuery),
2322        );
2323        assert_eq!(
2324            expr_without.inferred_type(),
2325            None,
2326            "Expected unresolved root type without schema-aware lineage qualification"
2327        );
2328
2329        let node_with_schema = lineage_with_schema(
2330            "name",
2331            &expr,
2332            Some(&schema),
2333            Some(DialectType::BigQuery),
2334            false,
2335        )
2336        .expect("lineage with schema");
2337        let mut expr_with = node_with_schema.expression.clone();
2338        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2339
2340        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2341    }
2342
2343    #[test]
2344    fn test_lineage_with_schema_correlated_scalar_subquery() {
2345        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2346        let dialect = Dialect::get(DialectType::BigQuery);
2347        let expr = dialect
2348            .parse(query)
2349            .unwrap()
2350            .into_iter()
2351            .next()
2352            .expect("expected one expression");
2353
2354        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2355        schema
2356            .add_table(
2357                "t1",
2358                &[("id".into(), DataType::BigInt { length: None })],
2359                None,
2360            )
2361            .expect("schema setup");
2362        schema
2363            .add_table(
2364                "t2",
2365                &[
2366                    ("id".into(), DataType::BigInt { length: None }),
2367                    ("val".into(), DataType::BigInt { length: None }),
2368                ],
2369                None,
2370            )
2371            .expect("schema setup");
2372
2373        let node = lineage_with_schema(
2374            "id",
2375            &expr,
2376            Some(&schema),
2377            Some(DialectType::BigQuery),
2378            false,
2379        )
2380        .expect("lineage_with_schema should handle correlated scalar subqueries");
2381
2382        assert_eq!(node.name, "id");
2383    }
2384
2385    #[test]
2386    fn test_lineage_with_schema_join_using() {
2387        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2388        let dialect = Dialect::get(DialectType::BigQuery);
2389        let expr = dialect
2390            .parse(query)
2391            .unwrap()
2392            .into_iter()
2393            .next()
2394            .expect("expected one expression");
2395
2396        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2397        schema
2398            .add_table(
2399                "t1",
2400                &[("a".into(), DataType::BigInt { length: None })],
2401                None,
2402            )
2403            .expect("schema setup");
2404        schema
2405            .add_table(
2406                "t2",
2407                &[("a".into(), DataType::BigInt { length: None })],
2408                None,
2409            )
2410            .expect("schema setup");
2411
2412        let node = lineage_with_schema(
2413            "a",
2414            &expr,
2415            Some(&schema),
2416            Some(DialectType::BigQuery),
2417            false,
2418        )
2419        .expect("lineage_with_schema should handle JOIN USING");
2420
2421        assert_eq!(node.name, "a");
2422    }
2423
2424    #[test]
2425    fn test_lineage_with_schema_qualified_table_name() {
2426        let query = "SELECT a FROM raw.t1";
2427        let dialect = Dialect::get(DialectType::BigQuery);
2428        let expr = dialect
2429            .parse(query)
2430            .unwrap()
2431            .into_iter()
2432            .next()
2433            .expect("expected one expression");
2434
2435        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2436        schema
2437            .add_table(
2438                "raw.t1",
2439                &[("a".into(), DataType::BigInt { length: None })],
2440                None,
2441            )
2442            .expect("schema setup");
2443
2444        let node = lineage_with_schema(
2445            "a",
2446            &expr,
2447            Some(&schema),
2448            Some(DialectType::BigQuery),
2449            false,
2450        )
2451        .expect("lineage_with_schema should handle dotted schema.table names");
2452
2453        assert_eq!(node.name, "a");
2454    }
2455
2456    #[test]
2457    fn test_lineage_with_schema_none_matches_lineage() {
2458        let expr = parse("SELECT a FROM t");
2459        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2460        let with_none =
2461            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2462
2463        assert_eq!(with_none.name, baseline.name);
2464        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2465    }
2466
2467    #[test]
2468    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2469        let dialect = Dialect::get(DialectType::BigQuery);
2470        let expr = dialect
2471            .parse("SELECT Name AS name FROM teams")
2472            .unwrap()
2473            .into_iter()
2474            .next()
2475            .expect("expected one expression");
2476
2477        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2478        schema
2479            .add_table(
2480                "teams",
2481                &[("Name".into(), DataType::String { length: None })],
2482                None,
2483            )
2484            .expect("schema setup");
2485
2486        let node = lineage_with_schema(
2487            "name",
2488            &expr,
2489            Some(&schema),
2490            Some(DialectType::BigQuery),
2491            false,
2492        )
2493        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2494
2495        let names = node.downstream_names();
2496        assert!(
2497            names.iter().any(|n| n == "teams.Name"),
2498            "Expected teams.Name in downstream, got: {:?}",
2499            names
2500        );
2501    }
2502
2503    #[test]
2504    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2505        let dialect = Dialect::get(DialectType::BigQuery);
2506        let expr = dialect
2507            .parse("SELECT Name AS Name FROM teams")
2508            .unwrap()
2509            .into_iter()
2510            .next()
2511            .expect("expected one expression");
2512
2513        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2514            .expect("lineage should resolve mixed-case aliases in BigQuery");
2515
2516        assert_eq!(node.name, "name");
2517    }
2518
2519    #[test]
2520    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2521        let expr = parse_one(
2522            r#"
2523SELECT date_val AS week_start
2524FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2525"#,
2526            DialectType::BigQuery,
2527        )
2528        .expect("parse");
2529
2530        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2531            .expect("lineage should resolve UNNEST alias as a source");
2532        let child = node
2533            .downstream
2534            .first()
2535            .expect("week_start should have downstream lineage");
2536
2537        assert_eq!(child.name, "date_val.date_val");
2538        assert_eq!(child.source_name, "date_val");
2539
2540        let Expression::Column(column) = &child.expression else {
2541            panic!(
2542                "expected downstream column expression, got {:?}",
2543                child.expression
2544            );
2545        };
2546        assert_eq!(column.name.name, "date_val");
2547        assert_eq!(
2548            column.table.as_ref().map(|table| table.name.as_str()),
2549            Some("date_val")
2550        );
2551        assert!(
2552            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2553            "expected UNNEST source expression, got {:?}",
2554            child.source
2555        );
2556    }
2557
2558    #[test]
2559    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2560        let expr = parse_one(
2561            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2562            DialectType::Snowflake,
2563        )
2564        .expect("parse");
2565
2566        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2567        schema
2568            .add_table(
2569                "fact.some_daily_metrics",
2570                &[("date_utc".to_string(), DataType::Date)],
2571                None,
2572            )
2573            .expect("schema setup");
2574
2575        let node = lineage_with_schema(
2576            "recency",
2577            &expr,
2578            Some(&schema),
2579            Some(DialectType::Snowflake),
2580            false,
2581        )
2582        .expect("lineage_with_schema should not treat date part as a column");
2583
2584        let names = node.downstream_names();
2585        assert!(
2586            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2587            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2588            names
2589        );
2590        assert!(
2591            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2592            "Did not expect date part to appear as lineage column, got: {:?}",
2593            names
2594        );
2595    }
2596
2597    #[test]
2598    fn test_snowflake_datediff_parses_to_typed_ast() {
2599        let expr = parse_one(
2600            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2601            DialectType::Snowflake,
2602        )
2603        .expect("parse");
2604
2605        match expr {
2606            Expression::Select(select) => match &select.expressions[0] {
2607                Expression::Alias(alias) => match &alias.this {
2608                    Expression::DateDiff(f) => {
2609                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2610                    }
2611                    other => panic!("expected DateDiff, got {other:?}"),
2612                },
2613                other => panic!("expected Alias, got {other:?}"),
2614            },
2615            other => panic!("expected Select, got {other:?}"),
2616        }
2617    }
2618
2619    #[test]
2620    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2621        let expr = parse_one(
2622            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2623            DialectType::Snowflake,
2624        )
2625        .expect("parse");
2626
2627        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2628        schema
2629            .add_table(
2630                "fact.some_daily_metrics",
2631                &[("date_utc".to_string(), DataType::Date)],
2632                None,
2633            )
2634            .expect("schema setup");
2635
2636        let node = lineage_with_schema(
2637            "next_day",
2638            &expr,
2639            Some(&schema),
2640            Some(DialectType::Snowflake),
2641            false,
2642        )
2643        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2644
2645        let names = node.downstream_names();
2646        assert!(
2647            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2648            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2649            names
2650        );
2651        assert!(
2652            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2653            "Did not expect date part to appear as lineage column, got: {:?}",
2654            names
2655        );
2656    }
2657
2658    #[test]
2659    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2660        let expr = parse_one(
2661            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2662            DialectType::Snowflake,
2663        )
2664        .expect("parse");
2665
2666        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2667        schema
2668            .add_table(
2669                "fact.some_daily_metrics",
2670                &[("date_utc".to_string(), DataType::Date)],
2671                None,
2672            )
2673            .expect("schema setup");
2674
2675        let node = lineage_with_schema(
2676            "day_part",
2677            &expr,
2678            Some(&schema),
2679            Some(DialectType::Snowflake),
2680            false,
2681        )
2682        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2683
2684        let names = node.downstream_names();
2685        assert!(
2686            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2687            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2688            names
2689        );
2690        assert!(
2691            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2692            "Did not expect date part to appear as lineage column, got: {:?}",
2693            names
2694        );
2695    }
2696
2697    #[test]
2698    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2699        let expr = parse_one(
2700            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2701            DialectType::Snowflake,
2702        )
2703        .expect("parse");
2704
2705        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2706        schema
2707            .add_table(
2708                "fact.some_daily_metrics",
2709                &[("date_utc".to_string(), DataType::Date)],
2710                None,
2711            )
2712            .expect("schema setup");
2713
2714        let node = lineage_with_schema(
2715            "day_part",
2716            &expr,
2717            Some(&schema),
2718            Some(DialectType::Snowflake),
2719            false,
2720        )
2721        .expect("quoted DATE_PART should continue to work");
2722
2723        let names = node.downstream_names();
2724        assert!(
2725            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2726            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2727            names
2728        );
2729    }
2730
2731    #[test]
2732    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2733        let expr = parse_one(
2734            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2735            DialectType::Snowflake,
2736        )
2737        .expect("parse");
2738
2739        match expr {
2740            Expression::Select(select) => match &select.expressions[0] {
2741                Expression::Alias(alias) => match &alias.this {
2742                    Expression::Function(f) => {
2743                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2744                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2745                    }
2746                    other => panic!("expected generic DATEADD function, got {other:?}"),
2747                },
2748                other => panic!("expected Alias, got {other:?}"),
2749            },
2750            other => panic!("expected Select, got {other:?}"),
2751        }
2752    }
2753
2754    #[test]
2755    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2756        let expr = parse_one(
2757            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2758            DialectType::Snowflake,
2759        )
2760        .expect("parse");
2761
2762        match expr {
2763            Expression::Select(select) => match &select.expressions[0] {
2764                Expression::Alias(alias) => match &alias.this {
2765                    Expression::Function(f) => {
2766                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2767                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2768                    }
2769                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2770                },
2771                other => panic!("expected Alias, got {other:?}"),
2772            },
2773            other => panic!("expected Select, got {other:?}"),
2774        }
2775    }
2776
2777    #[test]
2778    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2779        let expr = parse_one(
2780            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2781            DialectType::Snowflake,
2782        )
2783        .expect("parse");
2784
2785        match expr {
2786            Expression::Select(select) => match &select.expressions[0] {
2787                Expression::Alias(alias) => match &alias.this {
2788                    Expression::Function(f) => {
2789                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2790                    }
2791                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2792                },
2793                other => panic!("expected Alias, got {other:?}"),
2794            },
2795            other => panic!("expected Select, got {other:?}"),
2796        }
2797    }
2798
2799    #[test]
2800    fn test_lineage_join() {
2801        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2802
2803        let node_a = lineage("a", &expr, None, false).unwrap();
2804        let names_a = node_a.downstream_names();
2805        assert!(
2806            names_a.iter().any(|n| n == "t.a"),
2807            "Expected t.a, got: {:?}",
2808            names_a
2809        );
2810
2811        let node_b = lineage("b", &expr, None, false).unwrap();
2812        let names_b = node_b.downstream_names();
2813        assert!(
2814            names_b.iter().any(|n| n == "s.b"),
2815            "Expected s.b, got: {:?}",
2816            names_b
2817        );
2818    }
2819
2820    #[test]
2821    fn test_lineage_alias_leaf_has_resolved_source_name() {
2822        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2823        let node = lineage("col1", &expr, None, false).unwrap();
2824
2825        // Keep alias in the display lineage edge.
2826        let names = node.downstream_names();
2827        assert!(
2828            names.iter().any(|n| n == "t1.col1"),
2829            "Expected aliased column edge t1.col1, got: {:?}",
2830            names
2831        );
2832
2833        // Leaf should expose the resolved base table for consumers.
2834        let leaf = node
2835            .downstream
2836            .iter()
2837            .find(|n| n.name == "t1.col1")
2838            .expect("Expected t1.col1 leaf");
2839        assert_eq!(leaf.source_name, "table1");
2840        match &leaf.source {
2841            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2842            _ => panic!("Expected leaf source to be a table expression"),
2843        }
2844    }
2845
2846    #[test]
2847    fn test_lineage_derived_table() {
2848        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2849        let node = lineage("a", &expr, None, false).unwrap();
2850
2851        assert_eq!(node.name, "a");
2852        // Should trace through the derived table to t.a
2853        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2854        assert!(
2855            all_names.iter().any(|n| n == "t.a"),
2856            "Expected to trace through derived table to t.a, got: {:?}",
2857            all_names
2858        );
2859    }
2860
2861    #[test]
2862    fn test_lineage_cte() {
2863        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2864        let node = lineage("a", &expr, None, false).unwrap();
2865
2866        assert_eq!(node.name, "a");
2867        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2868        assert!(
2869            all_names.iter().any(|n| n == "t.a"),
2870            "Expected to trace through CTE to t.a, got: {:?}",
2871            all_names
2872        );
2873    }
2874
2875    #[test]
2876    fn test_lineage_union() {
2877        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2878        let node = lineage("a", &expr, None, false).unwrap();
2879
2880        assert_eq!(node.name, "a");
2881        // Should have 2 downstream branches
2882        assert_eq!(
2883            node.downstream.len(),
2884            2,
2885            "Expected 2 branches for UNION, got {}",
2886            node.downstream.len()
2887        );
2888    }
2889
2890    #[test]
2891    fn test_lineage_cte_union() {
2892        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2893        let node = lineage("a", &expr, None, false).unwrap();
2894
2895        // Should trace through CTE into both UNION branches
2896        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2897        assert!(
2898            all_names.len() >= 3,
2899            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2900            all_names
2901        );
2902    }
2903
2904    #[test]
2905    fn test_lineage_star() {
2906        let expr = parse("SELECT * FROM t");
2907        let node = lineage("*", &expr, None, false).unwrap();
2908
2909        assert_eq!(node.name, "*");
2910        // Should have downstream for table t
2911        assert!(
2912            !node.downstream.is_empty(),
2913            "Star should produce downstream nodes"
2914        );
2915    }
2916
2917    #[test]
2918    fn test_lineage_subquery_in_select() {
2919        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2920        let node = lineage("x", &expr, None, false).unwrap();
2921
2922        assert_eq!(node.name, "x");
2923        // Should have traced into the scalar subquery
2924        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2925        assert!(
2926            all_names.len() >= 2,
2927            "Expected tracing into scalar subquery, got: {:?}",
2928            all_names
2929        );
2930    }
2931
2932    #[test]
2933    fn test_lineage_multiple_columns() {
2934        let expr = parse("SELECT a, b FROM t");
2935
2936        let node_a = lineage("a", &expr, None, false).unwrap();
2937        let node_b = lineage("b", &expr, None, false).unwrap();
2938
2939        assert_eq!(node_a.name, "a");
2940        assert_eq!(node_b.name, "b");
2941
2942        // Each should trace independently
2943        let names_a = node_a.downstream_names();
2944        let names_b = node_b.downstream_names();
2945        assert!(names_a.iter().any(|n| n == "t.a"));
2946        assert!(names_b.iter().any(|n| n == "t.b"));
2947    }
2948
2949    #[test]
2950    fn test_get_source_tables() {
2951        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2952        let node = lineage("a", &expr, None, false).unwrap();
2953
2954        let tables = get_source_tables(&node);
2955        assert!(
2956            tables.contains("t"),
2957            "Expected source table 't', got: {:?}",
2958            tables
2959        );
2960    }
2961
2962    #[test]
2963    fn test_lineage_column_not_found() {
2964        let expr = parse("SELECT a FROM t");
2965        let result = lineage("nonexistent", &expr, None, false);
2966        assert!(result.is_err());
2967    }
2968
2969    #[test]
2970    fn test_lineage_nested_cte() {
2971        let expr = parse(
2972            "WITH cte1 AS (SELECT a FROM t), \
2973             cte2 AS (SELECT a FROM cte1) \
2974             SELECT a FROM cte2",
2975        );
2976        let node = lineage("a", &expr, None, false).unwrap();
2977
2978        // Should trace through cte2 → cte1 → t
2979        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2980        assert!(
2981            all_names.len() >= 3,
2982            "Expected to trace through nested CTEs, got: {:?}",
2983            all_names
2984        );
2985    }
2986
2987    #[test]
2988    fn test_trim_selects_true() {
2989        let expr = parse("SELECT a, b, c FROM t");
2990        let node = lineage("a", &expr, None, true).unwrap();
2991
2992        // The source should be trimmed to only include 'a'
2993        if let Expression::Select(select) = &node.source {
2994            assert_eq!(
2995                select.expressions.len(),
2996                1,
2997                "Trimmed source should have 1 expression, got {}",
2998                select.expressions.len()
2999            );
3000        } else {
3001            panic!("Expected Select source");
3002        }
3003    }
3004
3005    #[test]
3006    fn test_trim_selects_false() {
3007        let expr = parse("SELECT a, b, c FROM t");
3008        let node = lineage("a", &expr, None, false).unwrap();
3009
3010        // The source should keep all columns
3011        if let Expression::Select(select) = &node.source {
3012            assert_eq!(
3013                select.expressions.len(),
3014                3,
3015                "Untrimmed source should have 3 expressions"
3016            );
3017        } else {
3018            panic!("Expected Select source");
3019        }
3020    }
3021
3022    #[test]
3023    fn test_lineage_expression_in_select() {
3024        let expr = parse("SELECT a + b AS c FROM t");
3025        let node = lineage("c", &expr, None, false).unwrap();
3026
3027        // Should trace to both a and b from t
3028        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3029        assert!(
3030            all_names.len() >= 3,
3031            "Expected to trace a + b to both columns, got: {:?}",
3032            all_names
3033        );
3034    }
3035
3036    #[test]
3037    fn test_set_operation_by_index() {
3038        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3039
3040        // Trace column "a" which is at index 0
3041        let node = lineage("a", &expr, None, false).unwrap();
3042
3043        // UNION branches should be traced by index
3044        assert_eq!(node.downstream.len(), 2);
3045    }
3046
3047    // --- Tests for column lineage inside function calls (issue #18) ---
3048
3049    fn print_node(node: &LineageNode, indent: usize) {
3050        let pad = "  ".repeat(indent);
3051        println!(
3052            "{pad}name={:?} source_name={:?}",
3053            node.name, node.source_name
3054        );
3055        for child in &node.downstream {
3056            print_node(child, indent + 1);
3057        }
3058    }
3059
3060    #[test]
3061    fn test_issue18_repro() {
3062        // Exact scenario from the issue
3063        let query = "SELECT UPPER(name) as upper_name FROM users";
3064        println!("Query: {query}\n");
3065
3066        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3067        let exprs = dialect.parse(query).unwrap();
3068        let expr = &exprs[0];
3069
3070        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3071        println!("lineage(\"upper_name\"):");
3072        print_node(&node, 1);
3073
3074        let names = node.downstream_names();
3075        assert!(
3076            names.iter().any(|n| n == "users.name"),
3077            "Expected users.name in downstream, got: {:?}",
3078            names
3079        );
3080    }
3081
3082    #[test]
3083    fn test_lineage_bigquery_safe_namespace_issue207() {
3084        let query = r#"
3085WITH import_cte AS (
3086  SELECT timestamp, data, operation
3087  FROM `project`.`dataset`.`source_table`
3088),
3089transform_cte AS (
3090  SELECT
3091    timestamp,
3092    SAFE.PARSE_JSON(data) AS json_data
3093  FROM import_cte
3094)
3095SELECT json_data FROM transform_cte
3096"#;
3097        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3098        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3099            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3100        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3101
3102        assert!(
3103            names.iter().any(|name| name == "source_table.data"),
3104            "expected source_table.data in lineage, got {names:?}"
3105        );
3106        assert!(
3107            !names
3108                .iter()
3109                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3110            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3111        );
3112    }
3113
3114    #[test]
3115    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3116        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3117        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3118            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3119        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3120
3121        assert!(
3122            names.iter().any(|name| name == "t.data"),
3123            "expected t.data in lineage, got {names:?}"
3124        );
3125        assert!(
3126            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3127            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3128        );
3129    }
3130
3131    #[test]
3132    fn test_lineage_method_call_receiver_control() {
3133        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3134        let node = lineage("out", &expr, None, false).expect("lineage");
3135        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3136
3137        assert!(
3138            names.iter().any(|name| name == "t.obj"),
3139            "expected ordinary method receiver to remain in lineage, got {names:?}"
3140        );
3141        assert!(
3142            names.iter().any(|name| name == "t.arg"),
3143            "expected method argument in lineage, got {names:?}"
3144        );
3145    }
3146
3147    #[test]
3148    fn test_lineage_upper_function() {
3149        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3150        let node = lineage("upper_name", &expr, None, false).unwrap();
3151
3152        let names = node.downstream_names();
3153        assert!(
3154            names.iter().any(|n| n == "users.name"),
3155            "Expected users.name in downstream, got: {:?}",
3156            names
3157        );
3158    }
3159
3160    #[test]
3161    fn test_lineage_round_function() {
3162        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3163        let node = lineage("rounded", &expr, None, false).unwrap();
3164
3165        let names = node.downstream_names();
3166        assert!(
3167            names.iter().any(|n| n == "products.price"),
3168            "Expected products.price in downstream, got: {:?}",
3169            names
3170        );
3171    }
3172
3173    #[test]
3174    fn test_lineage_coalesce_function() {
3175        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3176        let node = lineage("val", &expr, None, false).unwrap();
3177
3178        let names = node.downstream_names();
3179        assert!(
3180            names.iter().any(|n| n == "t.a"),
3181            "Expected t.a in downstream, got: {:?}",
3182            names
3183        );
3184        assert!(
3185            names.iter().any(|n| n == "t.b"),
3186            "Expected t.b in downstream, got: {:?}",
3187            names
3188        );
3189    }
3190
3191    #[test]
3192    fn test_lineage_count_function() {
3193        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3194        let node = lineage("cnt", &expr, None, false).unwrap();
3195
3196        let names = node.downstream_names();
3197        assert!(
3198            names.iter().any(|n| n == "t.id"),
3199            "Expected t.id in downstream, got: {:?}",
3200            names
3201        );
3202    }
3203
3204    #[test]
3205    fn test_lineage_sum_function() {
3206        let expr = parse("SELECT SUM(amount) AS total FROM t");
3207        let node = lineage("total", &expr, None, false).unwrap();
3208
3209        let names = node.downstream_names();
3210        assert!(
3211            names.iter().any(|n| n == "t.amount"),
3212            "Expected t.amount in downstream, got: {:?}",
3213            names
3214        );
3215    }
3216
3217    #[test]
3218    fn test_lineage_case_with_nested_functions() {
3219        let expr =
3220            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3221        let node = lineage("result", &expr, None, false).unwrap();
3222
3223        let names = node.downstream_names();
3224        assert!(
3225            names.iter().any(|n| n == "t.x"),
3226            "Expected t.x in downstream, got: {:?}",
3227            names
3228        );
3229        assert!(
3230            names.iter().any(|n| n == "t.name"),
3231            "Expected t.name in downstream, got: {:?}",
3232            names
3233        );
3234    }
3235
3236    #[test]
3237    fn test_lineage_substring_function() {
3238        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3239        let node = lineage("short", &expr, None, false).unwrap();
3240
3241        let names = node.downstream_names();
3242        assert!(
3243            names.iter().any(|n| n == "t.name"),
3244            "Expected t.name in downstream, got: {:?}",
3245            names
3246        );
3247    }
3248
3249    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3250
3251    #[test]
3252    fn test_lineage_cte_select_star() {
3253        // Ported from sqlglot: test_lineage_source_with_star
3254        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3255        // After star expansion: SELECT y.a AS a FROM y
3256        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3257        let node = lineage("a", &expr, None, false).unwrap();
3258
3259        assert_eq!(node.name, "a");
3260        // Should successfully resolve column 'a' through the CTE
3261        // (previously failed with "Cannot find column 'a' in query")
3262        assert!(
3263            !node.downstream.is_empty(),
3264            "Expected downstream nodes tracing through CTE, got none"
3265        );
3266    }
3267
3268    #[test]
3269    fn test_lineage_cte_select_star_renamed_column() {
3270        // dbt standard pattern: CTE with column rename + outer SELECT *
3271        // This is the primary use case for dbt projects (jaffle-shop etc.)
3272        let expr =
3273            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3274        let node = lineage("customer_id", &expr, None, false).unwrap();
3275
3276        assert_eq!(node.name, "customer_id");
3277        // Should trace customer_id → renamed CTE → source.id
3278        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3279        assert!(
3280            all_names.len() >= 2,
3281            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3282            all_names
3283        );
3284    }
3285
3286    #[test]
3287    fn test_lineage_cte_select_star_multiple_columns() {
3288        // CTE exposes multiple columns, outer SELECT * should resolve each
3289        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3290
3291        for col in &["a", "b", "c"] {
3292            let node = lineage(col, &expr, None, false).unwrap();
3293            assert_eq!(node.name, *col);
3294            // Verify lineage resolves without error (star expanded to explicit columns)
3295            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3296            assert!(
3297                all_names.len() >= 2,
3298                "Expected at least 2 nodes for column {}, got: {:?}",
3299                col,
3300                all_names
3301            );
3302        }
3303    }
3304
3305    #[test]
3306    fn test_lineage_nested_cte_select_star() {
3307        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3308        let expr = parse(
3309            "WITH cte1 AS (SELECT a FROM t), \
3310             cte2 AS (SELECT * FROM cte1) \
3311             SELECT * FROM cte2",
3312        );
3313        let node = lineage("a", &expr, None, false).unwrap();
3314
3315        assert_eq!(node.name, "a");
3316        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3317        assert!(
3318            all_names.len() >= 3,
3319            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3320            all_names
3321        );
3322    }
3323
3324    #[test]
3325    fn test_lineage_three_level_nested_cte_star() {
3326        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3327        let expr = parse(
3328            "WITH cte1 AS (SELECT x FROM t), \
3329             cte2 AS (SELECT * FROM cte1), \
3330             cte3 AS (SELECT * FROM cte2) \
3331             SELECT * FROM cte3",
3332        );
3333        let node = lineage("x", &expr, None, false).unwrap();
3334
3335        assert_eq!(node.name, "x");
3336        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3337        assert!(
3338            all_names.len() >= 4,
3339            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3340            all_names
3341        );
3342    }
3343
3344    #[test]
3345    fn test_lineage_cte_union_star() {
3346        // CTE with UNION body, outer SELECT * should resolve from left branch
3347        let expr = parse(
3348            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3349             SELECT * FROM cte",
3350        );
3351        let node = lineage("a", &expr, None, false).unwrap();
3352
3353        assert_eq!(node.name, "a");
3354        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3355        assert!(
3356            all_names.len() >= 2,
3357            "Expected at least 2 nodes for CTE union star, got: {:?}",
3358            all_names
3359        );
3360    }
3361
3362    #[test]
3363    fn test_lineage_cte_star_unknown_table() {
3364        // When CTE references an unknown table, star expansion is skipped gracefully
3365        // and lineage falls back to normal resolution (which may fail)
3366        let expr = parse(
3367            "WITH cte AS (SELECT * FROM unknown_table) \
3368             SELECT * FROM cte",
3369        );
3370        // This should not panic — it may succeed or fail depending on resolution,
3371        // but should not crash
3372        let _result = lineage("x", &expr, None, false);
3373    }
3374
3375    #[test]
3376    fn test_lineage_cte_explicit_columns() {
3377        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3378        let expr = parse(
3379            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3380             SELECT * FROM cte",
3381        );
3382        let node = lineage("x", &expr, None, false).unwrap();
3383        assert_eq!(node.name, "x");
3384    }
3385
3386    #[test]
3387    fn test_lineage_cte_qualified_star() {
3388        // Qualified star: SELECT cte.* FROM cte
3389        let expr = parse(
3390            "WITH cte AS (SELECT a, b FROM t) \
3391             SELECT cte.* FROM cte",
3392        );
3393        for col in &["a", "b"] {
3394            let node = lineage(col, &expr, None, false).unwrap();
3395            assert_eq!(node.name, *col);
3396            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3397            assert!(
3398                all_names.len() >= 2,
3399                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3400                col,
3401                all_names
3402            );
3403        }
3404    }
3405
3406    #[test]
3407    fn test_lineage_subquery_select_star() {
3408        // Ported from sqlglot: test_select_star
3409        // SELECT x FROM (SELECT * FROM table_a)
3410        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3411        let node = lineage("x", &expr, None, false).unwrap();
3412
3413        assert_eq!(node.name, "x");
3414        assert!(
3415            !node.downstream.is_empty(),
3416            "Expected downstream nodes for subquery with SELECT *, got none"
3417        );
3418    }
3419
3420    #[test]
3421    fn test_lineage_cte_star_with_schema_external_table() {
3422        // CTE references an external table via SELECT * — schema enables expansion
3423        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3424SELECT * FROM orders"#;
3425        let expr = parse(sql);
3426
3427        let mut schema = MappingSchema::new();
3428        let cols = vec![
3429            ("order_id".to_string(), DataType::Unknown),
3430            ("customer_id".to_string(), DataType::Unknown),
3431            ("amount".to_string(), DataType::Unknown),
3432        ];
3433        schema.add_table("stg_orders", &cols, None).unwrap();
3434
3435        let node =
3436            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3437                .unwrap();
3438        assert_eq!(node.name, "order_id");
3439    }
3440
3441    #[test]
3442    fn test_lineage_cte_star_with_schema_three_part_name() {
3443        // CTE references an external table with fully-qualified 3-part name
3444        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3445SELECT * FROM orders"#;
3446        let expr = parse(sql);
3447
3448        let mut schema = MappingSchema::new();
3449        let cols = vec![
3450            ("order_id".to_string(), DataType::Unknown),
3451            ("customer_id".to_string(), DataType::Unknown),
3452        ];
3453        schema
3454            .add_table("db.schema.stg_orders", &cols, None)
3455            .unwrap();
3456
3457        let node = lineage_with_schema(
3458            "customer_id",
3459            &expr,
3460            Some(&schema as &dyn Schema),
3461            None,
3462            false,
3463        )
3464        .unwrap();
3465        assert_eq!(node.name, "customer_id");
3466    }
3467
3468    #[test]
3469    fn test_lineage_cte_star_with_schema_nested() {
3470        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3471        // inner CTE references external table with SELECT *
3472        let sql = r#"WITH
3473            raw AS (SELECT * FROM external_table),
3474            enriched AS (SELECT * FROM raw)
3475        SELECT * FROM enriched"#;
3476        let expr = parse(sql);
3477
3478        let mut schema = MappingSchema::new();
3479        let cols = vec![
3480            ("id".to_string(), DataType::Unknown),
3481            ("name".to_string(), DataType::Unknown),
3482        ];
3483        schema.add_table("external_table", &cols, None).unwrap();
3484
3485        let node =
3486            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3487        assert_eq!(node.name, "name");
3488    }
3489
3490    #[test]
3491    fn test_lineage_cte_qualified_star_with_schema() {
3492        // CTE uses qualified star (orders.*) from a CTE whose columns
3493        // come from an external table via SELECT *
3494        let sql = r#"WITH
3495            orders AS (SELECT * FROM stg_orders),
3496            enriched AS (
3497                SELECT orders.*, 'extra' AS extra
3498                FROM orders
3499            )
3500        SELECT * FROM enriched"#;
3501        let expr = parse(sql);
3502
3503        let mut schema = MappingSchema::new();
3504        let cols = vec![
3505            ("order_id".to_string(), DataType::Unknown),
3506            ("total".to_string(), DataType::Unknown),
3507        ];
3508        schema.add_table("stg_orders", &cols, None).unwrap();
3509
3510        let node =
3511            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3512                .unwrap();
3513        assert_eq!(node.name, "order_id");
3514
3515        // Also verify the extra column works
3516        let extra =
3517            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3518        assert_eq!(extra.name, "extra");
3519    }
3520
3521    #[test]
3522    fn test_lineage_cte_star_without_schema_still_works() {
3523        // Without schema, CTE-to-CTE star expansion still works
3524        let sql = r#"WITH
3525            cte1 AS (SELECT id, name FROM raw_table),
3526            cte2 AS (SELECT * FROM cte1)
3527        SELECT * FROM cte2"#;
3528        let expr = parse(sql);
3529
3530        // No schema — should still resolve through CTE chain
3531        let node = lineage("id", &expr, None, false).unwrap();
3532        assert_eq!(node.name, "id");
3533    }
3534
3535    #[test]
3536    fn test_lineage_nested_cte_star_with_join_and_schema() {
3537        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
3538        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
3539        let sql = r#"WITH
3540base_orders AS (
3541    SELECT * FROM stg_orders
3542),
3543with_payments AS (
3544    SELECT
3545        base_orders.*,
3546        p.amount
3547    FROM base_orders
3548    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3549),
3550final_cte AS (
3551    SELECT * FROM with_payments
3552)
3553SELECT * FROM final_cte"#;
3554        let expr = parse(sql);
3555
3556        let mut schema = MappingSchema::new();
3557        let order_cols = vec![
3558            (
3559                "order_id".to_string(),
3560                crate::expressions::DataType::Unknown,
3561            ),
3562            (
3563                "customer_id".to_string(),
3564                crate::expressions::DataType::Unknown,
3565            ),
3566            ("status".to_string(), crate::expressions::DataType::Unknown),
3567        ];
3568        let pay_cols = vec![
3569            (
3570                "payment_id".to_string(),
3571                crate::expressions::DataType::Unknown,
3572            ),
3573            (
3574                "order_id".to_string(),
3575                crate::expressions::DataType::Unknown,
3576            ),
3577            ("amount".to_string(), crate::expressions::DataType::Unknown),
3578        ];
3579        schema.add_table("stg_orders", &order_cols, None).unwrap();
3580        schema.add_table("stg_payments", &pay_cols, None).unwrap();
3581
3582        // order_id should trace back to stg_orders
3583        let node =
3584            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3585                .unwrap();
3586        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3587
3588        // The leaf should be "stg_orders.order_id" (not just "order_id")
3589        let has_table_qualified = all_names
3590            .iter()
3591            .any(|n| n.contains('.') && n.contains("order_id"));
3592        assert!(
3593            has_table_qualified,
3594            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3595            all_names
3596        );
3597
3598        // amount should trace back to stg_payments
3599        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3600            .unwrap();
3601        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3602
3603        let has_table_qualified = all_names
3604            .iter()
3605            .any(|n| n.contains('.') && n.contains("amount"));
3606        assert!(
3607            has_table_qualified,
3608            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3609            all_names
3610        );
3611    }
3612
3613    #[test]
3614    fn test_lineage_cte_alias_resolution() {
3615        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
3616        let sql = r#"WITH import_stg_items AS (
3617    SELECT item_id, name, status FROM stg_items
3618)
3619SELECT base.item_id, base.status
3620FROM import_stg_items AS base"#;
3621        let expr = parse(sql);
3622
3623        let node = lineage("item_id", &expr, None, false).unwrap();
3624        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3625        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
3626        assert!(
3627            all_names.iter().any(|n| n == "stg_items.item_id"),
3628            "Expected leaf 'stg_items.item_id', got: {:?}",
3629            all_names
3630        );
3631    }
3632
3633    #[test]
3634    fn test_lineage_cte_alias_with_schema_and_star() {
3635        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
3636        let sql = r#"WITH import_stg AS (
3637    SELECT * FROM stg_items
3638)
3639SELECT base.item_id, base.status
3640FROM import_stg AS base"#;
3641        let expr = parse(sql);
3642
3643        let mut schema = MappingSchema::new();
3644        schema
3645            .add_table(
3646                "stg_items",
3647                &[
3648                    ("item_id".to_string(), DataType::Unknown),
3649                    ("name".to_string(), DataType::Unknown),
3650                    ("status".to_string(), DataType::Unknown),
3651                ],
3652                None,
3653            )
3654            .unwrap();
3655
3656        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3657            .unwrap();
3658        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3659        assert!(
3660            all_names.iter().any(|n| n == "stg_items.item_id"),
3661            "Expected leaf 'stg_items.item_id', got: {:?}",
3662            all_names
3663        );
3664    }
3665
3666    #[test]
3667    fn test_lineage_cte_alias_with_join() {
3668        // Multiple CTE aliases in a JOIN: each should resolve independently
3669        let sql = r#"WITH
3670    import_users AS (SELECT id, name FROM users),
3671    import_orders AS (SELECT id, user_id, amount FROM orders)
3672SELECT u.name, o.amount
3673FROM import_users AS u
3674LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3675        let expr = parse(sql);
3676
3677        let node = lineage("name", &expr, None, false).unwrap();
3678        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3679        assert!(
3680            all_names.iter().any(|n| n == "users.name"),
3681            "Expected leaf 'users.name', got: {:?}",
3682            all_names
3683        );
3684
3685        let node = lineage("amount", &expr, None, false).unwrap();
3686        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3687        assert!(
3688            all_names.iter().any(|n| n == "orders.amount"),
3689            "Expected leaf 'orders.amount', got: {:?}",
3690            all_names
3691        );
3692    }
3693
3694    // -----------------------------------------------------------------------
3695    // Quoted CTE name tests — verifying SQL identifier case semantics
3696    // -----------------------------------------------------------------------
3697
3698    #[test]
3699    fn test_lineage_unquoted_cte_case_insensitive() {
3700        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
3701        // MyCte and MYCTE should match.
3702        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3703        let node = lineage("col", &expr, None, false).unwrap();
3704        assert_eq!(node.name, "col");
3705        assert!(
3706            !node.downstream.is_empty(),
3707            "Unquoted CTE should resolve case-insensitively"
3708        );
3709    }
3710
3711    #[test]
3712    fn test_lineage_quoted_cte_case_preserved() {
3713        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
3714        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3715        let node = lineage("col", &expr, None, false).unwrap();
3716        assert_eq!(node.name, "col");
3717        assert!(
3718            !node.downstream.is_empty(),
3719            "Quoted CTE with matching case should resolve"
3720        );
3721    }
3722
3723    #[test]
3724    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3725        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
3726        // sqlglot treats this as a table reference, not a CTE match.
3727        // Star expansion should NOT resolve through the CTE.
3728        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3729        // lineage("col", ...) should fail because "mycte" is treated as an external
3730        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
3731        let result = lineage("col", &expr, None, false);
3732        assert!(
3733            result.is_err(),
3734            "Quoted CTE with case mismatch should not expand star: {:?}",
3735            result
3736        );
3737    }
3738
3739    #[test]
3740    fn test_lineage_mixed_quoted_unquoted_cte() {
3741        // Mix of unquoted and quoted CTEs in a nested chain.
3742        let expr = parse(
3743            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3744        );
3745        let node = lineage("a", &expr, None, false).unwrap();
3746        assert_eq!(node.name, "a");
3747        assert!(
3748            !node.downstream.is_empty(),
3749            "Mixed quoted/unquoted CTE chain should resolve"
3750        );
3751    }
3752
3753    // -----------------------------------------------------------------------
3754    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
3755    // -----------------------------------------------------------------------
3756    //
3757    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
3758    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
3759    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
3760    // direct string comparison for CTE name matching, ignoring the quoted status.
3761    //
3762    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
3763    // and unquoted as case-insensitive. The scope system should do the same.
3764    //
3765    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
3766    // which is broader than the star expansion scope of this PR.
3767
3768    #[test]
3769    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3770        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
3771        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
3772        // as "mycte" incorrectly resolves to the CTE.
3773        //
3774        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
3775        // case-sensitive: "mycte" should NOT match CTE "MyCte".
3776        //
3777        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3778        // this test should fail — update the assertion to match correct behavior:
3779        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
3780        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3781        let node = lineage("col", &expr, None, false).unwrap();
3782        assert!(!node.downstream.is_empty());
3783        let child = &node.downstream[0];
3784        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
3785        assert_eq!(
3786            child.source_name, "MyCte",
3787            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3788             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3789        );
3790    }
3791
3792    #[test]
3793    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3794        // Known bug: same as above but with qualified column reference ("mycte".col).
3795        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
3796        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
3797        //
3798        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3799        // this test should fail — update to assert source_name != "MyCte".
3800        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3801        let node = lineage("col", &expr, None, false).unwrap();
3802        assert!(!node.downstream.is_empty());
3803        let child = &node.downstream[0];
3804        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
3805        assert_eq!(
3806            child.source_name, "MyCte",
3807            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3808             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3809        );
3810    }
3811
3812    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
3813
3814    /// sqlglot: test_node_name_doesnt_contain_comment
3815    /// Comments in column expressions should not affect lineage resolution.
3816    /// NOTE: This test uses SELECT * from a derived table, which is a separate
3817    /// known limitation in polyglot-sql (star expansion in subqueries).
3818    #[test]
3819    #[ignore = "requires derived table star expansion (separate issue)"]
3820    fn test_node_name_doesnt_contain_comment() {
3821        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3822        let node = lineage("x", &expr, None, false).unwrap();
3823
3824        assert_eq!(node.name, "x");
3825        assert!(!node.downstream.is_empty());
3826    }
3827
3828    /// A line comment between SELECT and the first column wraps the column
3829    /// in an Annotated node. Lineage must unwrap it to find the column name.
3830    /// Verify that commented and uncommented queries produce identical lineage.
3831    #[test]
3832    fn test_comment_before_first_column_in_cte() {
3833        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
3834        let sql_without_comment = "with t as (select 1 as a) select a from t";
3835
3836        // Without comment — baseline
3837        let expr_ok = parse(sql_without_comment);
3838        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3839
3840        // With comment — should produce identical lineage
3841        let expr_comment = parse(sql_with_comment);
3842        let node_comment = lineage("a", &expr_comment, None, false)
3843            .expect("with comment before first column should succeed");
3844
3845        assert_eq!(node_ok.name, node_comment.name, "node names should match");
3846        assert_eq!(
3847            node_ok.downstream_names(),
3848            node_comment.downstream_names(),
3849            "downstream lineage should be identical with or without comment"
3850        );
3851    }
3852
3853    /// Block comment between SELECT and first column.
3854    #[test]
3855    fn test_block_comment_before_first_column() {
3856        let sql = "with t as (select 1 as a) select /* section */ a from t";
3857        let expr = parse(sql);
3858        let node = lineage("a", &expr, None, false)
3859            .expect("block comment before first column should succeed");
3860        assert_eq!(node.name, "a");
3861        assert!(
3862            !node.downstream.is_empty(),
3863            "should have downstream lineage"
3864        );
3865    }
3866
3867    /// Comment before first column should not affect second column resolution.
3868    #[test]
3869    fn test_comment_before_first_column_second_col_ok() {
3870        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
3871        let expr = parse(sql);
3872
3873        let node_a =
3874            lineage("a", &expr, None, false).expect("column a with comment should succeed");
3875        assert_eq!(node_a.name, "a");
3876
3877        let node_b =
3878            lineage("b", &expr, None, false).expect("column b with comment should succeed");
3879        assert_eq!(node_b.name, "b");
3880    }
3881
3882    /// Aliased column with preceding comment.
3883    #[test]
3884    fn test_comment_before_aliased_column() {
3885        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
3886        let expr = parse(sql);
3887        let node =
3888            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3889        assert_eq!(node.name, "y");
3890        assert!(
3891            !node.downstream.is_empty(),
3892            "aliased column should have downstream lineage"
3893        );
3894    }
3895}